Snowpark Migration Accelerator: Códigos de problema para Python

SPRKPY1000

Mensagem: A versão do núcleo de projeto spark-core de origem é xx.xx:xx.x.x, a versão do spark-core suportada pelo snowpark é 2.12:3.1.2, portanto, pode haver diferenças funcionais entre os mapeamentos existentes.

Categoria: Aviso.

Descrição

Esse problema aparece quando a versão do Pyspark do seu código-fonte não é compatível. Isso significa que pode haver diferenças funcionais entre os mapeamentos existentes.

Recomendações adicionais

  • A versão do pyspark verificada pelo SMA para compatibilidade com o Snowpark é de 2.12 a 3.1.2. Se estiver usando uma versão fora desse intervalo, a ferramenta poderá produzir resultados inconsistentes. Você pode alterar a versão do código-fonte que está verificando.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1001

Message**:** This code section has parsing errors

Category**:** Parsing error.

Descrição

Um erro de análise é relatado pelo Snowpark Migration Accelerator (SMA) quando ele não consegue ler ou entender corretamente o código em um arquivo (não consegue «analisar» corretamente o arquivo). Esse código de problema aparece quando um arquivo tem um ou mais erros de análise.

Cenário

Entrada: A mensagem EWI é exibida quando o código tem sintaxe inválida, por exemplo:

def foo():
    x = %%%%%%1###1

Saída: O SMA encontra um erro de análise e comenta o erro de análise adicionando a mensagem EWI correspondente:

def foo():
    x
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(2, 7). Last valid token was 'x' @(2, 5), failed token '=' @(2, 7)
##      = %%%%%%1###1

Recomendações adicionais

  • Check that the file contains valid Python code. (You can use the issues.csv file to find all files with this EWI code to determine which file(s) were not processed by the tool due to parsing error(s).) Many parsing errors occur because only part of the code is input into the tool, so it’s bets to ensure that the code will run in the source. If it is valid, report that you encountered a parsing error using the Report an Issue option in the SMA. Include the line of code that was causing a parsing error in the description when you file this issue.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1002

Message**:** < element > is not supported,Spark element is not supported.

Category**:** Conversion error.

Descrição

Esse problema aparece quando a ferramenta detecta o uso de um elemento que não é compatível com o Snowpark e não tem seu próprio código de erro associado a ele. Esse é o código de erro genérico usado pelo SMA para um elemento sem suporte.

Recomendações adicionais

  • Mesmo que a opção ou o elemento da mensagem não seja compatível, isso não significa que não seja possível encontrar uma solução. Isso significa apenas que a ferramenta em si não consegue encontrar a solução.

  • Se encontrou um elemento não suportado de uma biblioteca pyspark.ml, considere uma abordagem alternativa. Há guias adicionais disponíveis para solucionar problemas relacionados ao ml, como este do Snowflake.

  • Verifique se o código-fonte tem a sintaxe correta. (Você pode usar o arquivo issues.csv para determinar onde estão ocorrendo os erros de conversão) Se a sintaxe estiver correta, informe que encontrou um erro de conversão em um elemento específico usando a opção de Relatar um problema no SMA. Inclua a linha de código que estava causando o erro na descrição quando registrar esse problema.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1003

Message**:** An error occurred when loading the symbol table.

Category**:** Conversion error.

Descrição

Esse problema aparece quando há um erro ao processar os símbolos na tabela de símbolos. A tabela de símbolos faz parte da arquitetura subjacente do SMA, permitindo conversões mais complexas. Esse erro pode ser devido a uma declaração inesperada no código-fonte.

Recomendações adicionais

  • This is unlikely to be an error in the source code itself, but rather is an error in how the tool processes the source code. The best resolution would be to post an issue in the SMA.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1004

Message**:** The symbol table could not be loaded.

Category**:** Parsing error.

Descrição

Esse problema aparece quando há um erro inesperado no processo de execução da ferramenta. Como a tabela de símbolos não pode ser carregada, a ferramenta não pode iniciar o processo de avaliação ou conversão.

Recomendações adicionais

SPRKPY1005

Aviso

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.conf.SparkConf is not required

Category**:** Warning.

Descrição

This issue appears when the tool detects the usage of pyspark.conf.SparkConf which is not required.

Cenário

Entrada

SparkConf pode ser chamado sem parâmetros ou com loadDefaults.

from pyspark import SparkConf

my_conf = SparkConf(loadDefaults=True)

Saída

For both cases (with or without parameters) SMA creates a Snowpark Session.builder object:

#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
#from pyspark import SparkConf
pass

#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
my_conf = Session.builder.configs({"user" : "my_user", "password" : "my_password", "account" : "my_account", "role" : "my_role", "warehouse" : "my_warehouse", "database" : "my_database", "schema" : "my_schema"}).create()

Recomendações adicionais

  • Esse é um parâmetro desnecessário que está sendo removido com a inserção de um comentário de aviso. Não deve haver nenhuma ação adicional por parte do usuário.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1006

Aviso

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.context.SparkContext is not required

Category**:** Warning.

Descrição

This issue appears when the tool detects the usage of pyspark.context.SparkContext, which is not required in Snowflake.

Cenário

Entrada

Neste exemplo, há dois contextos para criar conexões com um Spark Cluster

from pyspark import SparkContext

sql_context1 = SparkContext(my_sc1)
sql_context2 = SparkContext(sparkContext=my_sc2)

Saída

Como não há clusters no Snowflake, o contexto não é necessário. Observe que as variáveis my_sc1 e my_sc2, que contêm propriedades do Spark, podem não ser necessárias ou terão de ser adaptadas para corrigir o código.

from snowflake.snowpark import Session
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required

sql_context2 = my_sc2

Recomendações adicionais

  • Esse é um parâmetro desnecessário que está sendo removido com a inserção de um comentário de aviso. Não deve haver nenhuma ação por parte do usuário.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1007

Aviso

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.sql.context.SQLContext is not required

Category**:** Warning.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.context.SQLContext, which is not required.

Cenário

Entrada

Aqui temos um exemplo com diferentes sobrecargas de SparkContext.

from pyspark import SQLContext

my_sc1 = SQLContext(myMaster, myAppName, mySparkHome, myPyFiles, myEnvironment, myBatctSize, mySerializer, my_conf1)
my_sc2 = SQLContext(conf=my_conf2)
my_sc3 = SQLContext()

Saída

O código de saída comentou a linha do pyspark.SQLContext, e substitui os cenários por uma referência a uma configuração. Observe que as variáveis my_sc1 e my_sc2 que contêm propriedades do Spark podem não ser necessárias ou terão de ser adaptadas para corrigir o código.

#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
#from pyspark import SQLContext
pass

#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context2 = my_sc2

Recomendações adicionais

  • Esse é um parâmetro desnecessário e é removido com um comentário de aviso inserido no código-fonte. Não deve haver nenhuma ação por parte do usuário.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1008

Mensagem: pyspark.sql.context.HiveContext não é necessário

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.context.HiveContext, which is not required.

Cenário

Entrada

Este é um exemplo para criar uma conexão com um armazenamento do Hive.

from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
df = hive_context.table("myTable")
df.show()

Saída

In Snowflake there are not Hive stores, so the Hive Context is not required, You can still use parquet files on Snowflake please check this tutorial to learn how.

#EWI: SPRKPY1008 => pyspark.sql.context.HiveContext is not required
hive_context = sc
df = hive_context.table("myTable")
df.show()

the sc variable refers to a Snow Park Session Object

Correção recomendada

For the output code in the example you should add the Snow Park Session Object similar to this code:

## Here manually we can add the Snowpark Session object via a json config file called connection.json
import json
from snowflake.snowpark import Session
jsonFile = open("connection.json")
connection_parameter = json.load(jsonFile)
jsonFile.close()
sc = Session.builder.configs(connection_parameter).getOrCreate()

hive_context = sc
df = hive_context.table("myTable")
df.show()

Recomendações adicionais

SPRKPY1009

Message**:** pyspark.sql.dataframe.DataFrame.approxQuantile has a workaround

Category**:** Warning.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrame.approxQuantile which has a workaround.

Cenário

Entrada

It’s important understand that Pyspark uses two different approxQuantile functions, here we use the DataFrame approxQuantile version

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Sun', 10],
        ['Mon', 64],
        ['Thr', 12],
        ['Wen', 15],
        ['Thu', 68],
        ['Fri', 14],
        ['Sat', 13]]

columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)

Saída

O SMA retorna o EWI SPRKPY1009 sobre a linha em que approxQuantile é usado, para que você possa identificar onde corrigir.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Sun', 10],
        ['Mon', 64],
        ['Thr', 12],
        ['Wen', 15],
        ['Thu', 68],
        ['Fri', 14],
        ['Sat', 13]]

columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1009 => pyspark.sql.dataframe.DataFrame.approxQuantile has a workaround, see documentation for more info
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)

Correção recomendada

Use Snowpark approxQuantile method. Some parameters don’t match so they require some manual adjustments. for the output code’s example a recommended fix could be:

from snowflake.snowpark import Session
...
df = spark.createDataFrame(data, columns)

df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])

pyspark.sql.dataframe.DataFrame.approxQuantile relativeError parameter não existe no SnowPark.

Recomendações adicionais

SPRKPY1010

Mensagem: pyspark.sql.dataframe.DataFrame.checkpoint tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrame.checkpoint which has a workaround.

Cenário

Entrada

Em PySpark, os pontos de controle são usados para truncar o plano lógico de um dataframe, para evitar o crescimento de um plano lógico.

import tempfile
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
    spark.sparkContext.setCheckpointDir("/tmp/bb")
    df.checkpoint(False)

Saída

SMA returns the EWI SPRKPY1010 over the line where approxQuantile is used, so you can use to identify where to fix. Note that also marks the setCheckpointDir as unsupported, but a checpointed directory is not required for the fix.

import tempfile
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
    #EWI: SPRKPY1002 => pyspark.context.SparkContext.setCheckpointDir is not supported
    spark.setCheckpointDir("/tmp/bb")
    #EWI: SPRKPY1010 => pyspark.sql.dataframe.DataFrame.checkpoint has a workaround, see documentation for more info
    df.checkpoint(False)

Correção recomendada

O Snowpark elimina a necessidade de pontos de verificação explícitos: isso ocorre porque o Snowpark trabalha com operações baseadas em SQLque são otimizadas pelo mecanismo de otimização de consultas do Snowflake, eliminando a necessidade de cálculos não correspondidos ou planos lógicos que ficam fora de controle.

However there could be scenarios where you would require persist the result of a computation on a dataframe. In this scenarios you can save materialize the results by writing the dataframe on a Snowflake Table or in a Snowflake Temporary Table.

  • Com o uso de uma tabela permanente, o resultado computado pode ser acessado a qualquer momento, mesmo após o término da sessão.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_table", table_type="temporary") # Save the dataframe into Snowflake table "my_table".
df2 = Session.table("my_table") # Now I can access the stored result quering the table "my_table"
  • Uma solução alternativa, o uso de uma tabela temporária, tem a vantagem de que a tabela é excluída após o término da sessão:

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_temp_table", table_type="temporary") # Save the dataframe into Snowflake table "my_temp_table".
df2 = Session.table("my_temp_table") # Now I can access the stored result quering the table "my_temp_table"

Recomendações adicionais

SPRKPY1011

Mensagem: pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile which has a workaround.

Cenário

Entrada

It’s important understand that Pyspark uses two different approxQuantile functions, here we use the DataFrameStatFunctions approxQuantile version.

import tempfile
from pyspark.sql import SparkSession, DataFrameStatFunctions
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)
print(aprox_quantille)

Saída

O SMA retorna o EWI SPRKPY1011 sobre a linha em que approxQuantile é usado, para que você possa identificar onde corrigir.

import tempfile
from snowflake.snowpark import Session, DataFrameStatFunctions
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1011 => pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile has a workaround, see documentation for more info
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)

Correção recomendada

You can use Snowpark approxQuantile method. Some parameters don’t match so they require some manual adjustments. for the output code’s example a recommended fix could be:

from snowflake.snowpark import Session # remove DataFrameStatFunctions because is not required
...
df = spark.createDataFrame(data, columns)

aprox_quantille = df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])

pyspark.sql.dataframe.DataFrame.approxQuantile relativeError parameter não existe no SnowPark.

Recomendações adicionais

SPRKPY1012

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.dataframe.DataFrameStatFunctions.writeTo tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrameStatFunctions.writeTo which has a workaround.

Cenário

Entrada

Neste exemplo, o dataframe df é gravado em uma tabela Spark «table».

writer = df.writeTo("table")

Saída

O SMA retorna o EWI SPRKPY1012 sobre a linha em que DataFrameStatFunctions.writeTo é usado, para que você possa identificar o local a ser corrigido.

#EWI: SPRKPY1012 => pyspark.sql.dataframe.DataFrameStatFunctions.writeTo has a workaround, see documentation for more info
writer = df.writeTo("table")

Correção recomendada

Em vez disso, use df.write.SaveAsTable().

import df.write as wt
writer = df.write.save_as_table(table)

Recomendações adicionais

SPRKPY1013

Mensagem: pyspark.sql.functions.acosh tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.acosh which has a workaround.

Cenário

Entrada

On this example pyspark calculates the acosh for a dataframe by using pyspark.sql.functions.acosh

from pyspark.sql import SparkSession
from pyspark.sql.functions import acosh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))

Saída

O SMA retorna o EWI SPRKPY1013 sobre a linha em que acosh é usado, para que você possa identificar onde corrigir.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1013 => pyspark.sql.functions.acosh has a workaround, see documentation for more info
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))

Correção recomendada

There is no direct «acosh» implementation but «call_function» can be used instead, using «acosh» as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.select(call_function('ACOSH', col('value')))

Recomendações adicionais

SPRKPY1014

Mensagem: pyspark.sql.functions.asinh tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.asinh which has a workaround.

Cenário

Entrada

On this example pyspark calculates the asinh for a dataframe by using pyspark.sql.functions.asinh.

from pyspark.sql import SparkSession
from pyspark.sql.functions import asinh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("asinh_value", asinh(df["value"]))

Saída

O SMA retorna o EWI SPRKPY1014 sobre a linha em que o asinh é usado, para que você possa identificar onde corrigir.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1014 => pyspark.sql.functions.asinh has a workaround, see documentation for more info
df_result = df.withColumn("asinh_value", asinh(df["value"]))

Correção recomendada

There is no direct «asinh» implementation but «call_function» can be used instead, using «asinh» as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('asinh', col('value')))

Recomendações adicionais

SPRKPY1015

Mensagem: pyspark.sql.functions.atanh tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.atanh which has a workaround.

Cenário

Entrada

On this example pyspark calculates the atanh for a dataframe by using pyspark.sql.functions.atanh.

from pyspark.sql import SparkSession
from pyspark.sql.functions import atanh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("atanh_value", atanh(df["value"]))

Saída

SMA retorna o EWI SPRKPY1015 sobre a linha em que atanh é usado, para que você possa usar para identificar onde corrigir.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1015 => pyspark.sql.functions.atanh has a workaround, see documentation for more info
df_result = df.withColumn("atanh_value", atanh(df["value"]))

Correção recomendada

There is no direct «atanh» implementation but «call_function» can be used instead, using «atanh» as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('atanh', col('value')))

Recomendações adicionais

SPRKPY1016

Aviso

This issue code has been deprecated since Spark Conversion Core Version 0.11.7

Mensagem: pyspark.sql.functions.collect_set tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.collect_set which has a workaround.

Cenário

Entrada

Using collect*set to get the elements of _colname* without duplicates:

col = collect_set(colName)

Saída

O SMA retorna o EWI SPRKPY1016 sobre a linha em que collect_set é usado, para que você possa identificar onde corrigir.

#EWI: SPRKPY1016 => pyspark.sql.functions.collect_set has a workaround, see documentation for more info
col = collect_set(colName)

Correção recomendada

Use a função array_agg e adicione um segundo argumento com o valor True.

col = array_agg(col, True)

Recomendações adicionais

SPRKPY1017

Aviso

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

pyspark.sql.functions.date_add tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.date_add which has a workaround.

Cenário

Entrada

Neste exemplo, usamos date_add para calcular a data 5 dias após a data atual para o dataframe df.

col = df.select(date_add(df.colName, 5))

Saída

SMA retorna o EWI SPRKPY1017 sobre a linha em que date_add é usado, para que você possa identificar onde corrigir.

#EWI: SPRKPY1017 => pyspark.sql.functions.date_add has a workaround, see documentation for more info
col = df.select(date_add(df.colName, 5))

Correção recomendada

Import snowflake.snowpark.functions, which contains an implementation for date_add (and alias dateAdd) function.

from snowflake.snowpark.functions import date_add

col = df.select(date_add(df.dt, 1))

Recomendações adicionais

SPRKPY1018

Aviso

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Mensagem: pyspark.sql.functions.date_sub tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.date_sub which has a workaround.

Cenário

Entrada

Neste exemplo, usamos date_add para calcular a data 5 dias antes da data atual para o dataframe df.

col = df.select(date_sub(df.colName, 5))

Saída

O SMA retorna o EWI SPRKPY1018 sobre a linha em que date_sub é usado, para que você possa identificar onde corrigir.

#EWI: SPRKPY1018 => pyspark.sql.functions.date_sub has a workaround, see documentation for more info
col = df.select(date_sub(df.colName, 5))

Correção recomendada

Import snowflake.snowpark.functions, which contains an implementation for date_sub function.

from pyspark.sql.functions import date_sub
df.withColumn("date", date_sub(df.colName, 5))

Recomendações adicionais

SPRKPY1019

Aviso

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Mensagem: pyspark.sql.functions.datediff tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.datediff which has a workaround.

Cenário

Entrada

Neste exemplo, usamos o datediff para calcular a diferença de dia entre «today» e outras datas.

contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
           )

Saída

O SMA retorna o EWI SPRKPY1019 sobre a linha em que datediff é usado, para que você possa identificar onde corrigir.

from pyspark.sql.functions import datediff
#EWI: SPRKPY1019 => pyspark.sql.functions.datediff has a workaround, see documentation for more info
contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
           )

SMA convert pyspark.sql.functions.datediff onto snowflake.snowpark.functions.daydiff that also calculates the diference in days between two dates.

Correção recomendada

datediff(part: string ,end: ColumnOrName, start: ColumnOrName)

Action: Import snowflake.snowpark.functions, which contains an implementation for datediff function that requires an extra parameter for date time part and allows more versatility on calculate differences between dates.

from snowflake.snowpark import Session
from snowflake.snowpark.functions import datediff
contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff('day', lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff('day',lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff('day', lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff('day', lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff('day', lit(today),'vms_lastuserlogin'))
           )

Recomendação

SPRKPY1020

Mensagem: pyspark.sql.functions.instr tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.instr which has a workaround.

Cenário

Entrada

Aqui está um exemplo básico de uso do pyspark instr:

from pyspark.sql import SparkSession
from pyspark.sql.functions import instr
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(instr(df.test, 'cd').alias('result')).collect()

Saída:

O SMA retorna o EWI SPRKPY1020 sobre a linha em que o instr é usado, para que você possa identificar onde corrigir.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
#EWI: SPRKPY1020 => pyspark.sql.functions.instr has a workaround, see documentation for more info
df.select(instr(df.test, 'cd').alias('result')).collect()

Correção recomendada

Requires a manual change by using the function charindex and changing the order of the first two parameters.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import charindex, lit

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(charindex(lit('cd'), df.test).as_('result')).show()

Recomendações adicionais

SPRKPY1021

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.functions.last tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.functions.last function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.functions.last function that generates this EWI. In this example, the last function is used to get the last value for each name.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))

Saída

The SMA adds the EWI SPRKPY1021 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
#EWI: SPRKPY1021 => pyspark.sql.functions.last has a workaround, see documentation for more info
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))

Correção recomendada

As a workaround, you can use the Snowflake LAST_VALUE function. To invoke this function from Snowpark, use the snowflake.snowpark.functions.call_builtin function and pass the string last_value as the first argument and the corresponding column as the second argument. If you were using the name of the column in the last function, you should convert it into a column when calling the call_builtin function.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(call_builtin("last_value", col("value")).alias("last_value"))

Recomendações adicionais


description: >- The mode parameter in the methods of CSV, JSON and PARQUET is transformed to overwrite


SPRKPY1022

Mensagem: pyspark.sql.functions.log10 tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.functions.log10 function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.functions.log10 function that generates this EWI. In this example, the log10 function is used to calculate the base-10 logarithm of the value column.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))

Saída

The SMA adds the EWI SPRKPY1022 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
#EWI: SPRKPY1022 => pyspark.sql.functions.log10 has a workaround, see documentation for more info
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))

Correção recomendada

As a workaround, you can use the snowflake.snowpark.functions.log function by passing the literal value 10 as the base.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log(10, df["value"]))

Recomendações adicionais

SPRKPY1023

Mensagem: pyspark.sql.functions.log1p tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.functions.log1p function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.functions.log1p function that generates this EWI. In this example, the log1p function is used to calculate the natural logarithm of the value column.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))

Saída

The SMA adds the EWI SPRKPY1023 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
#EWI: SPRKPY1023 => pyspark.sql.functions.log1p has a workaround, see documentation for more info
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))

Correção recomendada

As a workaround, you can use the call_function function by passing the string ln as the first argument and by adding 1 to the second argument.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", call_function("ln", lit(1) + df["value"]))

Recomendações adicionais

SPRKPY1024

Mensagem: pyspark.sql.functions.log2 tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.functions.log2 function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.functions.log2 function that generates this EWI. In this example, the log2 function is used to calculate the base-2 logarithm of the value column.

df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))

Saída

The SMA adds the EWI SPRKPY1024 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
#EWI: SPRKPY1024 => pyspark.sql.functions.log2 has a workaround, see documentation for more info
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))

Correção recomendada

As a workaround, you can use the snowflake.snowpark.functions.log function by passing the literal value 2 as the base.

df = session.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log(2, df["value"]))

Recomendações adicionais

SPRKPY1025

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.functions.ntile tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.functions.ntile function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.functions.ntile function that generates this EWI. In this example, the ntile function is used to divide the rows into 3 buckets.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))

Saída

The SMA adds the EWI SPRKPY1025 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
#EWI: SPRKPY1025 => pyspark.sql.functions.ntile has a workaround, see documentation for more info
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))

Correção recomendada

Snowpark has an equivalent ntile function, however, the argument pass to it should be a column. As a workaround, you can convert the literal argument into a column using the snowflake.snowpark.functions.lit function.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(lit(3)).over(windowSpec))

Recomendações adicionais

SPRKPY1026

Aviso

This issue code has been deprecated since Spark Conversion Core 4.3.2

Mensagem: pyspark.sql.readwriter.DataFrameReader.csv tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.csv function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.csv function that generates this EWI. In this example, the csv function is used to read multiple .csv files with a given schema and uses some extra options such as encoding, header and sep to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.csv",
  "path/to/your/file2.csv",
  "path/to/your/file3.csv",
]

df = session.read.csv(
  file_paths,
  schema=my_schema,
  encoding="UTF-8",
  header=True,
  sep=","
)

Saída

The SMA adds the EWI SPRKPY1026 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.csv",
  "path/to/your/file2.csv",
  "path/to/your/file3.csv",
]

#EWI: SPRKPY1026 => pyspark.sql.readwriter.DataFrameReader.csv has a workaround, see documentation for more info
df = session.read.csv(
  file_paths,
  schema=my_schema,
  encoding="UTF-8",
  header=True,
  sep=","
)

Correção recomendada

In this section, we explain how to configure the path parameter, the schema parameter and some options to make them work in Snowpark.

1. parâmetro path

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .csv file to that stage using the prefix file://.

2. parâmetro schema

Snowpark does not allow defining the schema as a parameter of the csv function. As a workaround, you can use the snowflake.snowpark.DataFrameReader.schema function.

3. parâmetros de options

Snowpark does not allow defining the extra options as parameters of the csv function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

Nota

As seguintes opções não são suportadas pelo Snowpark:

  • columnNameOfCorruptRecord

  • emptyValue

  • enforceSchema

  • header

  • ignoreLeadingWhiteSpace

  • ignoreTrailingWhiteSpace

  • inferSchema

  • locale

  • maxCharsPerColumn

  • maxColumns

  • mode

  • multiLine

  • nanValue

  • negativoInf

  • nullValue

  • positivoInf

  • quoteAll

  • samplingRatio

  • timestampNTZFormat

  • unescapedQuoteHandling

Abaixo está o exemplo completo de como o código de entrada deve ficar depois de aplicar as sugestões mencionadas acima para fazê-lo funcionar no Snowpark:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.csv", f"@{stage}")

df = session.read.schema(my_schema).option("encoding", "UTF-8").option("sep", ",").csv(stage)

Recomendações adicionais

SPRKPY1027

Aviso

This issue code has been deprecated since Spark Conversion Core 4.5.2

Mensagem: pyspark.sql.readwriter.DataFrameReader.json tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.json function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.json function that generates this EWI. In this example, the json function is used to read multiple .json files with a given schema and uses some extra options such as primitiveAsString and dateFormat to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.json",
  "path/to/your/file2.json",
  "path/to/your/file3.json",
]

df = session.read.json(
  file_paths,
  schema=my_schema,
  primitiveAsString=True,
  dateFormat="2023-06-20"
)

Saída

The SMA adds the EWI SPRKPY1027 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.json",
  "path/to/your/file2.json",
  "path/to/your/file3.json",
]

#EWI: SPRKPY1027 => pyspark.sql.readwriter.DataFrameReader.json has a workaround, see documentation for more info
df = session.read.json(
  file_paths,
  schema=my_schema,
  primitiveAsString=True,
  dateFormat="2023-06-20"
)

Correção recomendada

In this section, we explain how to configure the path parameter, the schema parameter and some options to make them work in Snowpark.

1. parâmetro path

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .json file to that stage using the prefix file://.

2. parâmetro schema

Snowpark does not allow defining the schema as a parameter of the json function. As a workaround, you can use the snowflake.snowpark.DataFrameReader.schema function.

3. parâmetros de options

Snowpark does not allow defining the extra options as parameters of the json function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

Nota

As seguintes opções não são suportadas pelo Snowpark:

  • allowBackslashEscapingAnyCharacter

  • allowComments

  • allowNonNumericNumbers

  • allowNumericLeadingZero

  • allowSingleQuotes

  • allowUnquotedControlChars

  • allowUnquotedFieldNames

  • columnNameOfCorruptRecord

  • dropFiledIfAllNull

  • encoding

  • ignoreNullFields

  • lineSep

  • locale

  • mode

  • multiline

  • prefereDecimal

  • primitiveAsString

  • samplingRatio

  • timestampNTZFormat

  • timeZone

Abaixo está o exemplo completo de como o código de entrada deve ficar depois de aplicar as sugestões mencionadas acima para fazê-lo funcionar no Snowpark:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.json", f"@{stage}")

df = session.read.schema(my_schema).option("dateFormat", "2023-06-20").json(stage)

Recomendações adicionais

SPRKPY1028

Mensagem: pyspark.sql.readwriter.DataFrameReader.orc tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.orc function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.orc function that generates this EWI. In this example, the orc function is used to read multiple .orc files and uses some extra options such as mergeSchema and recursiveFileLookup to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.orc",
  "path/to/your/file2.orc",
  "path/to/your/file3.orc",
]

df = session.read.orc(
  file_paths,
  mergeSchema="True",
  recursiveFileLookup="True"
)

Saída

The SMA adds the EWI SPRKPY1028 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.orc",
  "path/to/your/file2.orc",
  "path/to/your/file3.orc",
]

#EWI: SPRKPY1028 => pyspark.sql.readwriter.DataFrameReader.orc has a workaround, see documentation for more info
df = session.read.orc(
  file_paths,
  mergeSchema="True",
  recursiveFileLookup="True"
)

Correção recomendada

In this section, we explain how to configure the path parameter and the extra options to make them work in Snowpark.

1. parâmetro path

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .orc file to that stage using the prefix file://.

2. parâmetros options

Snowpark does not allow defining the extra options as parameters of the orc function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

Nota

As seguintes opções não são suportadas pelo Snowpark:

  • compression

  • mergeSchema

Abaixo está o exemplo completo de como o código de entrada deve ficar depois de aplicar as sugestões mencionadas acima para fazê-lo funcionar no Snowpark:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.orc", f"@{stage}")

df = session.read.option(recursiveFileLookup, "True").orc(stage)

Recomendações adicionais

SPRKPY1029

Mensagem: Esse problema aparece quando a ferramenta detecta o uso do pyspark.sql.readwriter.DataFrameReader.parquet. Essa função é compatível, mas algumas das diferenças entre o Snowpark e o Spark API podem exigir algumas alterações manuais.

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.parquet function. This function is supported by Snowpark, however, there are some differences that would require some manual changes.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.parquet function that generates this EWI.

file_paths = [
  "path/to/your/file1.parquet",
  "path/to/your/file2.parquet",
  "path/to/your/file3.parquet",
]

df = session.read.parquet(
  *file_paths,
  mergeSchema="true",
  pathGlobFilter="*file*",
  recursiveFileLookup="true",
  modifiedBefore="2024-12-31T00:00:00",
  modifiedAfter="2023-12-31T00:00:00"
)

Saída

The SMA adds the EWI SPRKPY1029 to the output code to let you know that this function is supported by Snowpark, but it requires some manual adjustments. Please note that the options supported by Snowpark are transformed into option function calls and those that are not supported are removed. This is explained in more detail in the next sections.

file_paths = [
  "path/to/your/file1.parquet",
  "path/to/your/file2.parquet",
  "path/to/your/file3.parquet"
]

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => This issue appears when the tool detects the usage of pyspark.sql.readwriter.DataFrameReader.parquet. This function is supported, but some of the differences between Snowpark and the Spark API might require making some manual changes.
df = session.read.option("PATTERN", "*file*").parquet(
  *file_paths
)

Correção recomendada

In this section, we explain how to configure the paths and options parameters to make them work in Snowpark.

1. parâmetro paths

In Spark, this parameter can be a local or cloud location. Snowpark only accepts cloud locations using a snowflake stage. So, you can create a temporal stage and add each file into it using the prefix file://.

2. parâmetro options

Snowpark does not allow defining the different options as parameters of the parquet function. As a workaround, you can use the option or options functions to specify those parameters as extra options of the DataFrameReader.

Observe que options do Snowpark não são exatamente iguais a options do PySpark, portanto, pode ser necessário fazer algumas alterações manuais. Veja a seguir uma explicação mais detalhada de como configurar as opções mais comuns do PySpark no Snowpark.

2.1 opção mergeSchema

Parquet supports schema evolution, allowing users to start with a simple schema and gradually add more columns as needed. This can result in multiple parquet files with different but compatible schemas. In Snowflake, thanks to the infer_schema capabilities you don’t need to do that and therefore the mergeSchema option can just be removed.

2.2 opção pathGlobFilter

If you want to load only a subset of files from the stage, you can use the pattern option to specify a regular expression that matches the files you want to load. The SMA already automates this as you can see in the output of this scenario.

2.3 opção recursiveFileLookupstr

This option is not supported by Snowpark. The best recommendation is to use a regular expression like with the pathGlobFilter option to achieve something similar.

2.4 opção modifiedBefore / modifiedAfter

You can achieve the same result in Snowflake by using the metadata columns.

Nota

As seguintes opções não são suportadas pelo Snowpark:

  • compression

  • datetimeRebaseMode

  • int96RebaseMode

  • mergeSchema

Abaixo está o exemplo completo de como o código de entrada deve ser transformado para que funcione no Snowpark:

from snowflake.snowpark.column import METADATA_FILE_LAST_MODIFIED, METADATA_FILENAME

temp_stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}')

session.file.put(f"file:///path/to/your/file1.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file2.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file3.parquet", f"@{temp_stage}")

df = session.read \
  .option("PATTERN", ".*file.*") \
  .with_metadata(METADATA_FILENAME, METADATA_FILE_LAST_MODIFIED) \
  .parquet(temp_stage) \
  .where(METADATA_FILE_LAST_MODIFIED < '2024-12-31T00:00:00') \
  .where(METADATA_FILE_LAST_MODIFIED > '2023-12-31T00:00:00')

Recomendações adicionais

SPRKPY1030

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.session.SparkSession.Builder.appName tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.session.SparkSession.Builder.appName function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.session.SparkSession.Builder.appName function that generates this EWI. In this example, the appName function is used to set MyApp as the name of the application.

session = SparkSession.builder.appName("MyApp").getOrCreate()

Saída

The SMA adds the EWI SPRKPY1030 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1030 => pyspark.sql.session.SparkSession.Builder.appName has a workaround, see documentation for more info
session = Session.builder.appName("MyApp").getOrCreate()

Correção recomendada

As a workaround, you can import the snowpark_extensions package which provides an extension for the appName function.

import snowpark_extensions
session = SessionBuilder.appName("MyApp").getOrCreate()

Recomendações adicionais

SPRKPY1031

Aviso

This issue code has been deprecated since Spark Conversion Core 2.7.0

Mensagem: pyspark.sql.column.Column.contains tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.contains function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.column.Column.contains function that generates this EWI. In this example, the contains function is used to filter the rows where the “City” column contains the substring “New”.

df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(col("City").contains("New"))

Saída

The SMA adds the EWI SPRKPY1031 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
#EWI: SPRKPY1031 => pyspark.sql.column.Column.contains has a workaround, see documentation for more info
df_filtered = df.filter(col("City").contains("New"))

Correção recomendada

As a workaround, you can use the snowflake.snowpark.functions.contains function by passing the column as the first argument and the element to search as the second argument. If the element to search is a literal value then it should be converted into a column expression using the lit function.

from snowflake.snowpark import functions as f
df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(f.contains(col("City"), f.lit("New")))

Recomendações adicionais

SPRKPY1032

Message: *spark element* is not defined

Categoria: Erro de conversão

Descrição

Esse problema aparece quando o SMA não consegue determinar um status de mapeamento apropriado para um determinado elemento. Isso significa que o SMA ainda não sabe se esse elemento é compatível ou não com o Snowpark. Observe que esse é um código de erro genérico usado pelo SMA para qualquer elemento não definido.

Cenário

Entrada

Below is an example of a function for which the SMA could not determine an appropriate mapping status. In this case, you should assume that not_defined_function() is a valid PySpark function and the code runs.

sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()

Saída

The SMA adds the EWI SPRKPY1032 to the output code to let you know that this element is not defined.

#EWI: SPRKPY1032 => pyspark.rdd.RDD.not_defined_function is not defined
sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()

Correção recomendada

Para tentar identificar o problema, você pode realizar as seguintes validações:

  • Verifique se o código-fonte tem a sintaxe correta e se está escrito corretamente.

  • Check if you are using a PySpark version supported by the SMA. To know which PySpark version is supported by the SMA at the moment of running the SMA, you can review the first page of the DetailedReport.docx file.

If this is a valid PySpark element, please report that you encountered a conversion error on that particular element using the Report an Issue option of the SMA and include any additional information that you think may be helpful.

Please note that if an element is not defined, it does not mean that it is not supported by Snowpark. You should check the Snowpark Documentation to verify if an equivalent element exist.

Recomendações adicionais

SPRKPY1033

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.functions.asc tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.functions.asc function, which has a workaround.

Cenários

The pyspark.sql.functions.asc function takes either a column object or the name of the column as a string as its parameter. Both scenarios are not supported by Snowpark so this EWI is generated.

Cenário 1

Entrada

Below is an example of a use of the pyspark.sql.functions.asc function that takes a column object as parameter.

df.orderBy(asc(col))

Saída

The SMA adds the EWI SPRKPY1033 to the output code to let you know that the asc function with a column object parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc(col))

Correção recomendada

As a workaround, you can call the snowflake.snowpark.Column.asc function from the column parameter.

df.orderBy(col.asc())
Cenário 2

Entrada

Below is an example of a use of the pyspark.sql.functions.asc function that takes the name of the column as parameter.

df.orderBy(asc("colName"))

Saída

The SMA adds the EWI SPRKPY1033 to the output code to let you know that the asc function with a column name parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc("colName"))

Correção recomendada

As a workaround, you can convert the string parameter into a column object using the snowflake.snowpark.functions.col function and then call the snowflake.snowpark.Column.asc function.

df.orderBy(col("colName").asc())

Recomendações adicionais

SPRKPY1034

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.functions.desc tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.functions.desc function, which has a workaround.

Cenários

The pyspark.sql.functions.desc function takes either a column object or the name of the column as a string as its parameter. Both scenarios are not supported by Snowpark so this EWI is generated.

Cenário 1

Entrada

Below is an example of a use of the pyspark.sql.functions.desc function that takes a column object as parameter.

df.orderBy(desc(col))

Saída

The SMA adds the EWI SPRKPY1034 to the output code to let you know that the desc function with a column object parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc(col))

Correção recomendada

As a workaround, you can call the snowflake.snowpark.Column.desc function from the column parameter.

df.orderBy(col.desc())
Cenário 2

Entrada

Below is an example of a use of the pyspark.sql.functions.desc function that takes the name of the column as parameter.

df.orderBy(desc("colName"))

Saída

The SMA adds the EWI SPRKPY1034 to the output code to let you know that the desc function with a column name parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc("colName"))

Correção recomendada

As a workaround, you can convert the string parameter into a column object using the snowflake.snowpark.functions.col function and then call the snowflake.snowpark.Column.desc function.

df.orderBy(col("colName").desc())

Recomendações adicionais

SPRKPY1035

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.functions.reverse tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.functions.reverse function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.functions.reverse function that generates this EWI. In this example, the reverse function is used to reverse each string of the word column.

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))

Saída

The SMA adds the EWI SPRKPY1035 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse("word"))

Correção recomendada

As a workaround, you can import the snowpark_extensions package which provides an extension for the reverse function.

import snowpark_extensions

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))

Recomendações adicionais

SPRKPY1036

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.column.Column.getField tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.getField function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.column.Column.getField function that generates this EWI. In this example, the getField function is used to extract the name from the info column.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info").getField("name"))

Saída

The SMA adds the EWI SPRKPY1036 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
#EWI: SPRKPY1036 => pyspark.sql.column.Column.getField has a workaround, see documentation for more info
df_with_name = df.withColumn("name", col("info").getField("name"))

Correção recomendada

As a workaround, you can use the Snowpark column indexer operator with the name of the field as the index.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info")["name"])

Recomendações adicionais

SPRKPY1037

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.functions.sort_array tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.functions.sort_array function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.functions.sort_array function that generates this EWI. In this example, the sort_array function is used to sort the numbers array in ascending and descending order.

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

Saída

The SMA adds the EWI SPRKPY1037 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

Correção recomendada

As a workaround, you can import the snowpark_extensions package which provides an extension for the sort_array function.

import snowpark_extensions

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

Recomendações adicionais

SPRKPY1038

Message: *spark element* is not yet recognized

Categoria: Erro de conversão

Descrição

Esse problema aparece quando há um elemento PySpark em seu código-fonte que não foi reconhecido pelo SMA. Isso pode ocorrer por diferentes motivos, como:

  • Um elemento que não existe em PySpark.

  • Um elemento que foi adicionado em uma versão do PySpark que o SMA ainda não suporta.

  • Um erro interno do SMA ao processar o elemento.

Esse é um código de erro genérico usado pelo SMA para qualquer elemento não reconhecido.

Cenário

Entrada

Abaixo está um exemplo de uso de uma função que não pôde ser reconhecida pelo SMA porque ela não existe em PySpark.

from pyspark.sql import functions as F
F.unrecognized_function()

Saída

The SMA adds the EWI SPRKPY1038 to the output code to let you know that this element could not be recognized.

from snowflake.snowpark import functions as F
#EWI: SPRKPY1038 => pyspark.sql.functions.non_existent_function is not yet recognized
F.unrecognized_function()

Correção recomendada

Para tentar identificar o problema, você pode realizar as seguintes validações:

  • Verificar se o elemento existe em PySpark.

  • Verifique se o elemento está escrito corretamente.

  • Check if you are using a PySpark version supported by the SMA. To know which PySpark version is supported by the SMA at the moment of running the SMA, you can review the first page of the DetailedReport.docx file.

If it is a valid PySpark element, please report that you encountered a conversion error on that particular element using the Report an Issue option of the SMA and include any additional information that you think may be helpful.

Please note that if an element could not be recognized by the SMA, it does not mean that it is not supported by Snowpark. You should check the Snowpark Documentation to verify if an equivalent element exist.

Recomendações adicionais

SPRKPY1039

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.column.Column.getItem tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.getItem function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.column.Column.getItem function that generates this EWI. In this example, the getItem function is used to get an item by position and by key.

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits").getItem(0))

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))

Saída

The SMA adds the EWI SPRKPY1039 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("first_fruit", col("fruits").getItem(0))

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))

Correção recomendada

Como solução alternativa, você pode usar o operador de indexador de coluna Snowpark com o nome ou a posição do campo como índice.

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits")[0])

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities")["apple"])

Recomendações adicionais

SPRKPY1040

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.functions.explode tem uma solução alternativa, consulte a documentação para obter mais informações

Categoria: Aviso

Descrição

This issue appears when the SMA detects a use of the pyspark.sql.functions.explode function, which has a workaround.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.functions.explode function that generates this EWI. In this example, the explode function is used to generate one row per array item for the numbers column.

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))

Saída

The SMA adds the EWI SPRKPY1040 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
#EWI: SPRKPY1040 => pyspark.sql.functions.explode has a workaround, see documentation for more info
exploded_df = df.select("name", explode(df.numbers).alias("number"))

Correção recomendada

As a workaround, you can import the snowpark_extensions package which provides an extension for the explode function.

import snowpark_extensions

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))

Recomendações adicionais

SPRKPY1041

Aviso

This issue code has been deprecated since Spark Conversion Core Version 2.9.0

Mensagem: pyspark.sql.functions.explode_outer tem uma solução alternativa

Categoria: Aviso

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.explode_outer which has a workaround.

Cenário

Entrada

O exemplo mostra o uso do método explode_outer em uma chamada de seleção.

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

df.select("id", "an_array", explode_outer("a_map")).show()

Saída

The tool adds the EWI SPRKPY1041 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

#EWI: SPRKPY1041 => pyspark.sql.functions.explode_outer has a workaround, see documentation for more info
df.select("id", "an_array", explode_outer("a_map")).show()

Correção recomendada

As a workaround, you can import the snowpark_extensions package, which contains a helper for the explode_outer function.

import snowpark_extensions

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

df.select("id", "an_array", explode_outer("a_map")).show()

Recomendações adicionais

SPRKPY1042

Mensagem: pyspark.sql.functions.posexplode tem uma solução alternativa

Categoria: Aviso

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.posexplode which has a workaround.

Cenários

There are a couple of scenarios that this method can handle depending on the type of column it is passed as a parameter, it can be a list of values or a map/directory (keys/values).

Cenário 1

Entrada

Below is an example of the usage of posexplode passing as a parameter of a list of values.

df = spark.createDataFrame(
    [Row(a=1,
         intlist=[1, 2, 3])])

df.select(posexplode(df.intlist)).collect()

Saída

The tool adds the EWI SPRKPY1042 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [Row(a=1,
         intlist=[100, 200, 300])])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info

df.select(posexplode(df.intlist)).show()

Correção recomendada

For having the same behavior, use the method functions.flatten, drop extra columns, and rename index and value column names.

df = spark.createDataFrame(
  [Row(a=1,
       intlist=[1, 2, 3])])

df.select(
    flatten(df.intlist))\
    .drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
    .rename({"INDEX": "pos", "VALUE": "col"}).show()
Cenário 2

Entrada

Below is another example of the usage of posexplode passing as a parameter a map/dictionary (keys/values)

df = spark.createDataFrame([
    [1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])

df.select(posexplode(df.maps)).show()

Saída

The tool adds the EWI SPRKPY1042 indicating that a workaround can be implemented.

df = spark.createDataFrame([
    [1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info

df.select(posexplode(df.maps)).show()

Correção recomendada

As a workaround, you can use functions.row_number to get the position and functions.explode with the name of the field to get the value the key/value for dictionaries.

df = spark.createDataFrame([
    [10, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [11, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
    schema=["idx", "lists", "maps", "strs"])

window = Window.orderBy(col("idx").asc())

df.select(
    row_number().over(window).alias("pos"),
    explode(df.maps).alias("key", "value")).show()

Observação: usar row_number não é totalmente equivalente, pois começa com 1 (não zero como o método spark)

Recomendações adicionais

SPRKPY1043

Mensagem: pyspark.sql.functions.posexplode_outer tem uma solução alternativa

Categoria: Aviso

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.posexplode_outer which has a workaround.

Cenários

There are a couple of scenarios that this method can handle depending on the type of column it is passed as a parameter, it can be a list of values or a map/directory (keys/values).

Cenário 1

Entrada

Below is an example that shows the usage of posexplode_outer passing a list of values.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))

df.select("id", "an_array", posexplode_outer("an_array")).show()

Saída

The tool adds the EWI SPRKPY1043 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info

df.select("id", "an_array", posexplode_outer("an_array")).show()

Correção recomendada

For having the same behavior, use the method functions.flatten sending the outer parameter in True, drop extra columns, and rename index and value column names.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))

df.select(
    flatten(df.an_array, outer=True))\
    .drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
    .rename({"INDEX": "pos", "VALUE": "col"}).show()
Cenário 2

Entrada

Abaixo está outro exemplo de uso do posexplode_outer passando um mapa/dicionário (chaves/valores)

df = spark.createDataFrame(
    [
        (1, {"x": 1.0}),
        (2, {}),
        (3, None)],
    ("id", "a_map"))

df.select(posexplode_outer(df.a_map)).show()

Saída

The tool adds the EWI SPRKPY1043 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [
        (1, {"x": "Ashi Garami"}),
        (2, {}),
        (3, None)],
    ("id", "a_map"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info

df.select(posexplode_outer(df.a_map)).show()

Correção recomendada

As a workaround, you can use functions.row_number to get the position and functions.explode_outer with the name of the field to get the value of the key/value for dictionaries.

df = spark.createDataFrame(
    [
        (1, {"x": "Ashi Garami"}),
        (2,  {}),
        (3, None)],
    ("id", "a_map"))

window = Window.orderBy(col("id").asc())

df.select(
    row_number().over(window).alias("pos"),
          explode_outer(df.a_map)).show()

Observação: usar row_number não é totalmente equivalente, pois começa com 1 (não zero como o método spark)

Recomendações adicionais

SPRKPY1044

Aviso

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

Mensagem: pyspark.sql.functions.split tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.split which has a workaround.

Cenários

Há alguns cenários, dependendo da quantidade de parâmetros passados para o método.

Cenário 1

Entrada

Below is an example when the function split has just the str and pattern parameters

F.split('col', '\\|')

Saída

The tool shows the EWI SPRKPY1044 indicating there is a workaround.

#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|')

Correção recomendada

As a workaround, you can call the function snowflake.snowpark.functions.lit with the pattern parameter and send it into the split.

F.split('col', lit('\\|'))
## the result of lit will be sent to the split function

Cenário 2

Entrada

Below is another example when the function split has the str, pattern, and limit parameters.

F.split('col', '\\|', 2)

Saída

The tool shows the EWI SPRKPY1044 indicating there is a workaround.

#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|', 2)

Correção recomendada

Esse cenário específico não é suportado.

Recomendações adicionais

SPRKPY1045

Mensagem: pyspark.sql.functions.map_values tem uma solução alternativa

Categoria: Aviso.

Descrição

Essa função é usada para extrair a lista de valores de uma coluna que contém um mapa/dicionário (chaves/valores).

The issue appears when the tool detects the usage of pyspark.sql.functions.map_values which has a workaround.

Cenário

Entrada

Below is an example of the usage of the method map_values.

df = spark.createDataFrame(
    [(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
    ("id", "a_map"))

df.select(map_values("a_map")).show()

Saída

The tool adds the EWI SPRKPY1045 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
    ("id", "a_map"))
#EWI: SPRKPY1045 => pyspark.sql.functions.map_values has a workaround, see documentation for more info

df.select(map_values("a_map")).show()

Correção recomendada

As a workaround, you can create an udf to get the values for a column. The below example shows how to create the udf, then assign it to F.map_values, and then make use of it.

from snowflake.snowpark import functions as F
from snowflake.snowpark.types import ArrayType, MapType

map_values_udf=None

def map_values(map):
    global map_values_udf
    if not map_values_udf:
        def _map_values(map: dict)->list:
            return list(map.values())
        map_values_udf = F.udf(_map_values,return_type=ArrayType(),input_types=[MapType()],name="map_values",is_permanent=False,replace=True)
    return map_values_udf(map)

F.map_values = map_values

df.select(map_values(colDict))

Recomendações adicionais

SPRKPY1046

Aviso

This issue code has been deprecated since Spark Conversion Core Version 2.1.22

Mensagem: pyspark.sql.functions.monotonically_increasing_id tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.monotonically_increasing_id which has a workaround.

Cenário

Entrada

Below is an example of the usage of the method monotonically_increasing_id.

from pyspark.sql import functions as F

spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()

Saída

The tool adds the EWI SPRKPY1046 indicating that a workaround can be implemented.

from pyspark.sql import functions as F
#EWI: SPRKPY1046 => pyspark.sql.functions.monotonically_increasing_id has a workaround, see documentation for more info
spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()

Correção recomendada

Atualize a versão da ferramenta.

Recomendações adicionais

SPRKPY1047

Aviso

This issue code has been deprecated since Spark Conversion Core Version 4.6.0

Descrição

This issue appears when the tool detects the usage of pyspark.context.SparkContext.setLogLevel which has a workaround.

Cenário

Entrada

Below is an example of the usage of the method setLogLevel.

sparkSession.sparkContext.setLogLevel("WARN")

Saída

The tool adds the EWI SPRKPY1047 indicating that a workaround can be implemented.

#EWI: SPRKPY1047 => pyspark.context.SparkContext.setLogLevel has a workaround, see documentation for more info
sparkSession.sparkContext.setLogLevel("WARN")

Correção recomendada

Replace the setLogLevel function usage with logging.basicConfig that provides a set of convenience functions for simple logging usage. In order to use it, we need to import two modules, «logging» and «sys», and the level constant should be replaced using the «Level equivalent table»:

import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.WARNING)
  • Tabela de níveis equivalentes

Parâmetro de origem de nível

Parâmetro de destino de nível

«ALL»

<mark style=»color:red;»>**This has no equivalent**</mark>

«DEBUG»

logging.DEBUG

«ERROR»

logging.ERROR

«FATAL»

logging.CRITICAL

«INFO»

logging.INFO

«OFF»

logging.NOTSET

«TRACE»

<mark style=»color:red;»>**This has no equivalent**</mark>

«WARN»

logging.WARNING

Recomendações adicionais

SPRKPY1048

Aviso

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

Mensagem: pyspark.sql.session.SparkSession.conf tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.conf which has a workaround.

Cenário

Entrada

Below is an example of how to set a configuration into the property conf .

spark.conf.set("spark.sql.crossJoin.enabled", "true")

Saída

The tool adds the EWI SPRKPY1048 indicating that a workaround can be implemented.

#EWI: SPRKPY1048 => pyspark.sql.session.SparkSession.conf has a workaround, see documentation for more info
spark.conf.set("spark.sql.crossJoin.enabled", "true")

Correção recomendada

SparkSession.conf é usado para passar algumas configurações específicas usadas apenas pelo Pyspark e não se aplica ao Snowpark. Você pode remover ou comentar o código

#spark.conf.set("spark.sql.crossJoin.enabled", "true")

Recomendações adicionais

SPRKPY1049

Aviso

This issue code has been deprecated since Spark Conversion Core Version 2.1.9

Mensagem: pyspark.sql.session.SparkSession.sparkContext tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.sparkContext which has a workaround.

Cenário

Entrada

Below is an example that creates a spark session and then uses the SparkContext property to print the appName.

print("APP Name :"+spark.sparkContext.appName())

Saída

The tool adds the EWI SPRKPY1049 indicating that a workaround can be implemented.

#EWI: SPRKPY1049 => pyspark.sql.session.SparkSession.sparkContext has a workaround, see documentation for more info
print("APP Name :"+spark.sparkContext.appName())

Correção recomendada

O SparkContext não é compatível com SnowPark, mas você pode acessar os métodos e as propriedades de SparkContext diretamente da instância Session.

## Pyspark
print("APP Name :"+spark.sparkContext.appName())
can be used in SnowPark removing the sparkContext as:
#Manual adjustment in SnowPark
print("APP Name :"+spark.appName());

Recomendações adicionais

SPRKPY1050

Mensagem: pyspark.conf.SparkConf.set tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.conf.SparkConf.set which has a workaround.

Cenário

Entrada

Below is an example that sets a variable using conf.set.

conf = SparkConf().setAppName('my_app')

conf.set("spark.storage.memoryFraction", "0.5")

Saída

The tool adds the EWI SPRKPY1050 indicating that a workaround can be implemented.

conf = SparkConf().setAppName('my_app')

#EWI: SPRKPY1050 => pyspark.conf.SparkConf.set has a workaround, see documentation for more info
conf.set("spark.storage.memoryFraction", "0.5")

Correção recomendada

SparkConf.set é usado para definir uma configuração usada somente pelo Pyspark e não se aplica ao Snowpark. Você pode remover ou comentar o código

#conf.set("spark.storage.memoryFraction", "0.5")

Recomendações adicionais

SPRKPY1051

Aviso

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

Mensagem: pyspark.sql.session.SparkSession.Builder.master tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects pyspark.sql.session.SparkSession.Builder.master usage which has a workaround.

Cenário

Entrada

Below is an example of the usage of the method builder.master to set the Spark Master URL to connect to local using 1 core.

spark = SparkSession.builder.master("local[1]")

Saída

The tool adds the EWI SPRKPY1051 indicating that a workaround can be implemented.

#EWI: SPRKPY1051 => pyspark.sql.session.SparkSession.Builder.master has a workaround, see documentation for more info
spark = Session.builder.master("local[1]")

Correção recomendada

pyspark.sql.session.SparkSession.Builder.master is used to set up a Spark Cluster. Snowpark doesn’t use Spark Clusters so you can remove or comment the code.

## spark = Session.builder.master("local[1]")

Recomendações adicionais

SPRKPY1052

Aviso

This issue code has been deprecated since Spark Conversion Core Version 2.8.0

Mensagem: pyspark.sql.session.SparkSession.Builder.enableHiveSupport tem uma solução alternativa

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.Builder.enableHiveSupport which has a workaround.

Cenário

Entrada

Below is an example that configures the SparkSession and enables the hive support using the method enableHiveSupport.

spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .enableHiveSupport().getOrCreate()

Saída

The tool adds the EWI SPRKPY1052 indicating that a workaround can be implemented.

#EWI: SPRKPY1052 => pyspark.sql.session.SparkSession.Builder.enableHiveSupport has a workaround, see documentation for more info
spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .enableHiveSupport().getOrCreate()

Correção recomendada

Remove the use of enableHiveSupport function because it is not needed in Snowpark.

spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .getOrCreate()

Recomendações adicionais

SPRKPY1053

Mensagem: Ocorreu um erro ao extrair os arquivos dbc.

Categoria: Aviso.

Descrição

Esse problema aparece quando um arquivo dbc não pode ser extraído. Esse aviso pode ser causado por um ou mais dos seguintes motivos: muito pesado, inacessível, somente leitura, etc.

Recomendações adicionais

  • Como solução alternativa, você pode verificar o tamanho do arquivo se ele for muito pesado para ser processado. Além disso, analise se a ferramenta pode acessá-la para evitar problemas de acesso.

  • Para obter mais suporte, envie um e-mail para snowconvert-info@snowflake.com. Se tiver um contrato de suporte com a Snowflake, entre em contato com seu engenheiro de vendas para que ele possa direcionar suas necessidades de suporte.

SPRKPY1080

Mensagem: O valor de SparkContext é substituído pela variável “session”.

Categoria: Aviso

Descrição

O contexto do Spark é armazenado em uma variável chamada session que cria uma sessão do Snowpark.

Cenário

Entrada

Este snippet descreve um SparkContext

## Input Code
from pyspark import SparkContext
from pyspark.sql import SparkSession

def example1():

    sc = SparkContext("local[*]", "TestApp")

    sc.setLogLevel("ALL")
    sc.setLogLevel("DEBUG")

Saída

Nesse código de saída, o SMA substituiu o PySpark.SparkContext por um SparkSession, observe que o SMA também adiciona um modelo para substituir a conexão no arquivo «connection.json» e, em seguida, carrega essa configuração na variável connection_parameter.

## Output Code
import logging
import sys
import json
from snowflake.snowpark import Session
from snowflake.snowpark import Session

def example1():
    jsonFile = open("connection.json")
    connection_parameter = json.load(jsonFile)
    jsonFile.close()
    #EWI: SPRKPY1080 => The value of SparkContext is replaced with 'session' variable.
    sc = Session.builder.configs(connection_parameter).getOrCreate()
    sc.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
    logging.basicConfig(stream = sys.stdout, level = logging.NOTSET)
    logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)

Correção recomendada

O arquivo de configuração «connection.json» deve ser atualizado com as informações de conexão necessárias:

{
  "user": "my_user",
  "password": "my_password",
  "account": "my_account",
  "role": "my_role",
  "warehouse": "my_warehouse",
  "database": "my_database",
  "schema": "my_schema"
}

Recomendações adicionais

SPRKPY1054

Mensagem: pyspark.sql.readwriter.DataFrameReader.format não é suportado.

Categoria: Aviso.

Descrição

This issue appears when the pyspark.sql.readwriter.DataFrameReader.format has an argument that is not supported by Snowpark.

Cenários

There are some scenarios depending on the type of format you are trying to load. It can be a supported , or non-supported format.

Cenário 1

Entrada

A ferramenta analisa o tipo de formato que está tentando carregar; os formatos compatíveis são:

  • Csv

  • JSON

  • Parquet

  • Orc

The below example shows how the tool transforms the format method when passing a Csv value.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df1 = spark.read.format('csv').load('/path/to/file')

Saída

The tool transforms the format method into a Csv method call.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

df1 = spark.read.csv('/path/to/file')

Correção recomendada

Nesse caso, a ferramenta não mostra o EWI, o que significa que não há necessidade de correção.

Cenário 2

Entrada

The below example shows how the tool transforms the format method when passing a Jdbc value.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

df2 = spark.read.format('jdbc') \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Saída

The tool shows the EWI SPRKPY1054 indicating that the value «jdbc» is not supported.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "jdbc" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported

df2 = spark.read.format('jdbc') \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Correção recomendada

For the not supported scenarios, there is no specific fix since it depends on the files that are trying to be read.

Cenário 3

Entrada

The below example shows how the tool transforms the format method when passing a CSV, but using a variable instead.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

myFormat = 'csv'
df3 = spark.read.format(myFormat).load('/path/to/file')

Saída

Since the tool can not determine the value of the variable in runtime, shows the EWI SPRKPY1054 indicating that the value «» is not supported.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

myFormat = 'csv'
#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported
df3 = spark.read.format(myFormat).load('/path/to/file')

Correção recomendada

As a workaround, you can check the value of the variable and add it as a string to the format call.

Recomendações adicionais

SPRKPY1055

Mensagem: o valor da chave pyspark.sql.readwriter.DataFrameReader.option não é suportado.

Categoria: Aviso.

Descrição

This issue appears when the pyspark.sql.readwriter.DataFrameReader.option key value is not supported by SnowFlake.

A ferramenta analisa os parâmetros de chamada de opção e, dependendo do método (CSV ou JSON ou PARQUET), o valor da chave pode ter ou não um equivalente no Snowpark. Se todos os parâmetros tiverem um equivalente, a ferramenta não adicionará o EWI e substituirá o valor da chave pelo seu equivalente; caso contrário, a ferramenta adicionará o EWI.

Lista de equivalências:

  • Equivalências para CSV:

Chaves de opção do Spark

Equivalências do Snowpark

sep

FIELD_DELIMITER

header

PARSE_HEADER

lineSep

RECORD_DELIMITER

pathGlobFilter

PATTERN

quote

FIELD_OPTIONALLY_ENCLOSED_BY

nullValue

NULL_IF

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

inferSchema

INFER_SCHEMA

delimiter

FIELD_DELIMITER

  • Equivalências para JSON:

Chaves de opção do Spark

Equivalências do Snowpark

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

pathGlobFilter

PATTERN

  • Equivalências para PARQUET:

Chaves de opção do Spark

Equivalências do Snowpark

pathGlobFilter

PATTERN

Qualquer outra opção de chave que não esteja em uma das tabelas acima não é compatível ou não tem um equivalente no Snowpark. Se esse for o caso, a ferramenta adiciona o EWI com as informações do parâmetro e o remove da cadeia.

Cenários

Os cenários abaixo se aplicam a CSV, JSON e PARQUET.

There are a couple of scenarios depending on the value of the key used in the option method.

Cenário 1

Entrada

Below is an example of a option call using a equivalent key.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## CSV example:
spark.read.option("header", True).csv(csv_file_path)

## Json example:
spark.read.option("dateFormat", "dd-MM-yyyy").json(json_file_path)

## Parquet example:
spark.read.option("pathGlobFilter", "*.parquet").parquet(parquet_file_path)

Saída

A ferramenta transforma a chave com o equivalente correto.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()

## CSV example:
spark.read.option("PARSE_HEADER", True).csv(csv_file_path)

## Json example:
spark.read.option("DATE_FORMAT", "dd-MM-yyyy").json(json_file_path)

## Parquet example:
spark.read.option("PATTERN", "*.parquet").parquet(parquet_file_path)

Correção recomendada

Como a ferramenta transforma o valor da chave, não há necessidade de correção.

Cenário 2

Entrada

Below is an example of a option call using a non-equivalent key.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## CSV example:
spark.read.option("anotherKeyValue", "myVal").csv(csv_file_path)

## Json example:
spark.read.option("anotherKeyValue", "myVal").json(json_file_path)

## Parquet example:
spark.read.option("anotherKeyValue", "myVal").parquet(parquet_file_path)

Saída

The tool adds the EWI SPRKPY1055 indicating the key is not supported and removes the option call.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()

## CSV example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.csv(csv_file_path)

## Json example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.json(json_file_path)

## Parquet example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.parquet(parquet_file_path)

Correção recomendada

Recomenda-se que verifique o comportamento após a transformação.

Recomendações adicionais

  • Quando houver parâmetros não equivalentes, é recomendável verificar o comportamento após a transformação.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1056

Aviso

Este código de problema está obsoleto

Mensagem: pyspark.sql.readwriter.DataFrameReader.option argument _ <argument_name> _ não é um literal e não pode ser avaliado

Categoria: Aviso

Descrição

This issue appears when the argument’s key or value of the pyspark.sql.readwriter.DataFrameReader.option function is not a literal value (for example a variable). The SMA does a static analysis of your source code, and therefore it is not possible to evaluate the content of the argument.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.option function that generates this EWI.

my_value = ...
my_option = ...

df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')

Saída

The SMA adds the EWI SPRKPY1056 to the output code to let you know that the argument of this function is not a literal value, and therefore it could not be evaluated by the SMA.

my_value = ...
my_option = ...

#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument "dateFormat" is not a literal and can't be evaluated
df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument key is not a literal and can't be evaluated
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')

Correção recomendada

Even though the SMA was unable to evaluate the argument, it does not mean that it is not supported by Snowpark. Please make sure that the value of the argument is valid and equivalent in Snowpark by checking the documentation.

Recomendações adicionais

SPRKPY1057

Aviso

This Issue Code has been deprecated since Spark Conversion Core Version 4.8.0

Message: PySpark Dataframe Option argument contains a value that is not a literal, therefore cannot be evaluated

Categoria: Aviso.

Descrição

Esse código de problema está obsoleto. Se você estiver usando uma versão mais antiga, atualize para a mais recente.

Recomendações adicionais

SPRKPY1058

Mensagem: < método > com < chave > Não há suporte para a chave específica da plataforma.

Categoria: ConversionError

Descrição

The get and set methods from pyspark.sql.conf.RuntimeConfig are not supported with a Platform specific key.

Cenários

Not all usages of get or set methods are going to have an EWI in the output code. This EWI appears when the tool detects the usage of these methods with a Platform specific key which is not supported.

Cenário 1

Entrada

Below is an example of the get or set methods with supported keys in Snowpark.

session.conf.set("use_constant_subquery_alias", False)
spark.conf.set("sql_simplifier_enabled", True)

session.conf.get("use_constant_subquery_alias")
session.conf.get("use_constant_subquery_alias")

Saída

Como as chaves são compatíveis com o Snowpark, a ferramenta não adiciona o EWI no código de saída.

session.conf.set("use_constant_subquery_alias", True)
session.conf.set("sql_simplifier_enabled", False)

session.conf.get("use_constant_subquery_alias")
session.conf.get("sql_simplifier_enabled")

Correção recomendada

Não há nenhuma correção recomendada para esse cenário.

Cenário 2

Entrada

Abaixo está um exemplo usando chaves não suportadas.

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

session.conf.set("spark.sql.shuffle.partitions", "50")
spark.conf.set("spark.yarn.am.memory", "1g")

session.conf.get("spark.sql.shuffle.partitions")
session = spark.conf.get("spark.yarn.am.memory")

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

Saída

The tool adds this EWI SPRKPY1058 on the output code to let you know that these methods are not supported with a Platform specific key.

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.set("spark.sql.shuffle.partitions", "50")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.set("spark.yarn.am.memory", "1g")

#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.get("spark.sql.shuffle.partitions")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.get("spark.yarn.am.memory")

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

Correção recomendada

A correção recomendada é remover esses métodos.

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

Recomendações adicionais

SPRKPY1059

Aviso

This issue code has been deprecated since Spark Conversion Core Version 2.45.1

Message: pyspark.storagelevel.StorageLevel has a workaround, see documentation.

Categoria: Aviso

Descrição

Currently, the use of StorageLevel is not required in Snowpark since Snowflake controls the storage. For more information, you can refer to the EWI SPRKPY1072

Recomendações adicionais

SPRKPY1060

Mensagem: O mecanismo de autenticação é connection.json (modelo fornecido).

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.conf.SparkConf.

Cenário

Entrada

Como o mecanismo de autenticação é diferente no Snowpark, a ferramenta remove os usos e cria um arquivo de configuração de conexão (connection.json) em vez disso.

from pyspark import SparkConf

my_conf = SparkConf(loadDefaults=True)

Saída

The tool adds the EWI SPRKPY1060 indicating that the authentication mechanism is different.

#EWI: SPRKPY1002 => pyspark.conf.SparkConf is not supported
#EWI: SPRKPY1060 => The authentication mechanism is connection.json (template provided).
#my_conf = Session.builder.configs(connection_parameter).getOrCreate()

my_conf = None

Correção recomendada

To create a connection it is necessary that you fill in the information in the connection.json file.

{
  "user": "<USER>",
  "password": "<PASSWORD>",
  "account": "<ACCOUNT>",
  "role": "<ROLE>",
  "warehouse": "<WAREHOUSE>",
  "database": "<DATABASE>",
  "schema": "<SCHEMA>"
}

Recomendações adicionais

SPRKPY1061

Mensagem: O Snowpark não é compatível com as funções unix_timestamp

Categoria: Aviso

Descrição

In Snowpark, the first parameter is mandatory; the issue appears when the tool detects the usage of pyspark.sql.functions.unix_timestamp with no parameters.

Cenário

Entrada

Below an example that calls the unix_timestamp method without parameters.

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ['dt', 'val'])
df.select(unix_timestamp()).show()

Saída

The Snowpark signature for this function unix_timestamp(e: ColumnOrName, fmt: Optional["Column"] = None), as you can notice the first parameter it’s required.

The tool adds this EWI SPRKPY1061 to let you know that function unix_timestamp with no parameters it’s not supported in Snowpark.

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ['dt', 'val'])
#EWI: SPRKPY1061 => Snowpark does not support unix_timestamp functions with no parameters. See documentation for more info.
df.select(unix_timestamp()).show()

Correção recomendada

Como solução alternativa, você pode adicionar pelo menos o nome ou a coluna da cadeia de caracteres de carimbo de data/hora.

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ["dt", "val"])
df.select(unix_timestamp("dt")).show()

Recomendações adicionais

SPRKPY1062

Mensagem: O Snowpark não é compatível com GroupedData.pivot sem o parâmetro «values».

Categoria: Aviso

Descrição

This issue appears when the SMA detects the usage of the pyspark.sql.group.GroupedData.pivot function without the «values» parameter (the list of values to pivot on).

No momento, a função de pivô do Snowpark Python exige que você especifique explicitamente a lista de valores distintos sobre os quais fazer o pivô.

Cenários

Cenário 1

Entrada

The SMA detects an expression that matches the pattern dataFrame.groupBy("columnX").pivot("columnY") and the pivot does not have the values parameter.

df.groupBy("date").pivot("category").sum("amount")

Saída

O SMA adiciona uma mensagem EWI indicando que não há suporte para a função de pivô sem o parâmetro «values».

Além disso, ele adicionará como segundo parâmetro da função de pivô uma compreensão de lista que calcula a lista de valores que serão convertidos em colunas. Lembre-se de que essa operação não é eficiente para grandes conjuntos de dados, e é aconselhável indicar os valores explicitamente.

#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df.groupBy("date").pivot("category", [v[0] for v in df.select("category").distinct().limit(10000).collect()]]).sum("amount")

Correção recomendada

Para esse cenário, o SMA adiciona um segundo parâmetro da função de pivô, uma compreensão de lista que calcula a lista de valores que serão convertidos em colunas, mas você pode usar uma lista de valores distintos para pivotar, como segue:

df = spark.createDataFrame([
      Row(category="Client_ID", date=2012, amount=10000),
      Row(category="Client_name",   date=2012, amount=20000)
  ])

df.groupBy("date").pivot("category", ["dotNET", "Java"]).sum("amount")
Cenário 2

Entrada

The SMA couldn’t detect an expression that matches the pattern dataFrame.groupBy("columnX").pivot("columnY") and the pivot does not have the values parameter.

df1.union(df2).groupBy("date").pivot("category").sum("amount")

Saída

O SMA adiciona uma mensagem EWI indicando que não há suporte para a função de pivô sem o parâmetro «values».

#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df1.union(df2).groupBy("date").pivot("category").sum("amount")

Correção recomendada

Adicione uma lista de valores distintos para fazer o pivô, como segue:

df = spark.createDataFrame([
      Row(course="dotNET", year=2012, earnings=10000),
      Row(course="Java",   year=2012, earnings=20000)
  ])

df.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").show()

Recomendações adicionais

  • O cálculo da lista de valores distintos para pivotar não é uma operação eficiente em grandes conjuntos de dados e pode se tornar uma chamada de bloqueio. Considere a possibilidade de indicar explicitamente a lista de valores distintos para dinamizar.

  • Se não quiser especificar explicitamente a lista de valores distintos para pivotar (o que não é aconselhável), você pode adicionar o seguinte código como o segundo argumento da função pivot para inferir os valores em tempo de execução*

[v[0] for v in <df>.select(<column>).distinct().limit(<count>).collect()]]

****Replace*** :code:`<df>` with the corresponding DataFrame, with the column to pivot and with the number of rows to select.

SPRKPY1063

Mensagem: pyspark.sql.pandas.functions.pandas_udf tem uma solução alternativa.

Categoria: Aviso

Descrição

This issue appears when the tool detects the usage of pyspark.sql.pandas.functions.pandas_udf which has a workaround.

Cenário

Entrada

A função pandas_udf é usada para criar funções definidas pelo usuário que trabalham com grandes quantidades de dados.

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})
df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])
new_df = df.groupby().apply(modify_df)

Saída

O SMA adiciona uma mensagem EWI indicando que o pandas_udf tem uma solução alternativa.

#EWI: SPRKPY1062 => pyspark.sql.pandas.functions.pandas_udf has a workaround, see documentation for more info
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)

def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})

df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])

new_df = df.groupby().apply(modify_df)

Correção recomendada

Specify explicitly the parameters types as a new parameter input_types, and remove functionType parameter if applies. Created function must be called inside a select statement.

@pandas_udf(
    return_type = schema,
    input_types = [PandasDataFrameType([IntegerType(), IntegerType()])]
)

def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})

df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])

new_df = df.groupby().apply(modify_df) # You must modify function call to be a select and not an apply

Recomendações adicionais

SPRKPY1064

Message: The *Spark element* does not apply since snowflake uses snowpipe mechanism instead.

Categoria: Aviso

Descrição

Esse problema aparece quando a ferramenta detecta o uso de qualquer elemento da biblioteca pyspark.streaming:

Cenário

Entrada

Abaixo está um exemplo com um dos elementos que acionam esse EWI.

from pyspark.streaming.listener import StreamingListener

var = StreamingListener.Java
var.mro()

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

Saída

The SMA adds the EWI SPRKPY1064 on the output code to let you know that this function does not apply.

#EWI: SPRKPY1064 => The element does not apply since snowflake uses snowpipe mechanism instead.

var = StreamingListener.Java
var.mro()

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

Correção recomendada

The SMA removes the import statement and adds the issue to the Issues.csv inventory, remove any usages of the Spark element.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

Recomendações adicionais

SPRKPY1065

Mensagem: O pyspark.context.SparkContext.broadcast não se aplica, pois o snowflake usa o mecanismo data-clustering para computar os dados.

Categoria: Aviso

Descrição

This issue appears when the tool detects the usage of element pyspark.context.SparkContext.broadcast, which is not necessary due to the use of data-clustering of Snowflake.

Código de entrada

Neste exemplo, é criada uma variável de transmissão. Essas variáveis permitem que os dados sejam compartilhados de forma mais eficiente por todos os nós.

sc = SparkContext(conf=conf_spark)

mapping = {1: 10001, 2: 10002}

bc = sc.broadcast(mapping)

Código de saída

O SMA adiciona uma mensagem EWI indicando que a transmissão não é necessária.

sc = conf_spark

mapping = {1: 10001, 2: 10002}
#EWI: SPRKPY1065 => The element does not apply since snowflake use data-clustering mechanism to compute the data.

bc = sc.broadcast(mapping)

Correção recomendada

Remova todos os usos de pyspark.context.SparkContext.broadcast.

sc = conf_spark

mapping = {1: 10001, 2: 10002}

Recomendações adicionais

SPRKPY1066

Mensagem: O elemento Spark não se aplica, pois o Snowflake usa o mecanismo de micropartição criado automaticamente.

Categoria: Aviso

Descrição

Esse problema aparece quando a ferramenta detecta o uso de elementos relacionados a partições:

Those elements do not apply due the use of micro-partitions of Snowflake.

Código de entrada

In this example sortWithinPartitions it’s used to create a partition in a DataFrame sorted by the specified column.

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.sortWithinPartitions("age", ascending=False)

Código de saída

O SMA adiciona uma mensagem EWI indicando que o elemento Spark não é necessário.

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
#EWI: SPRKPY1066 => The element does not apply since snowflake use micro-partitioning mechanism are created automatically.
df.sortWithinPartitions("age", ascending=False)

Correção recomendada

Remova o uso do elemento.

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])

Recomendações adicionais

SPRKPY1067

Mensagem: O pyspark.sql.functions.split tem parâmetros que não são suportados pelo Snowpark.

Categoria: Aviso

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.split with more than two parameters or a regex pattern as a parameter; both cases are not supported.

Cenários

Cenário 1

Código de entrada

Neste exemplo, a função split tem mais de dois parâmetros.

df.select(split(columnName, ",", 5))

Código de saída

A ferramenta adiciona esse EWI no código de saída para informar que essa função não é suportada quando tem mais de dois parâmetros.

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, ",", 5))

Correção recomendada

Mantenha a função split com apenas dois parâmetros.

df.select(split(columnName, ","))
Cenário 2

Código de entrada

Neste exemplo, a função split tem um padrão regex como parâmetro.

df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))

Código de saída

A ferramenta adiciona esse EWI no código de saída para informar que essa função não é suportada quando tem um padrão regex como parâmetro.

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))

Correção recomendada

The spark signature for this method functions.split(str: ColumnOrName, pattern: str, limit: int = - 1) not exactly match with the method in Snowpark functions.split(str: Union[Column, str], pattern: Union[Column, str]) so for now the scenario using regular expression does not have a recommended fix.

Recomendações adicionais

SPRKPY1068

Mensagem: toPandas contém colunas do tipo ArrayType que não são suportadas e têm uma solução alternativa.

Categoria: Aviso

Descrição

pyspark.sql.DataFrame.toPandas doesn’t work properly If there are columns of type ArrayType. The workaround for these cases is converting those columns into a Python Dictionary by using json.loads method.

Cenário

Entrada

ToPandas retorna os dados do DataFrame original como um Pandas DataFrame.

sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])

pandasDF = sparkDF.toPandas()

Saída

A ferramenta adiciona este EWI para que você saiba que o toPandas não é suportado se houver colunas do tipo ArrayType, mas tem uma solução alternativa.

sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])
#EWI: SPRKPY1068 => toPandas doesn't work properly If there are columns of type ArrayType. The workaround for these cases is converting those columns into a Python Dictionary by using json.loads method. example: df[colName] = json.loads(df[colName]).
pandasDF = sparkDF.toPandas()

Correção recomendada

pandas_df = sparkDF.toPandas()

## check/convert all resulting fields from calling toPandas when they are of
## type ArrayType,
## they will be reasigned by converting them into a Python Dictionary
## using json.loads method​

for field in pandas_df.schema.fields:
    if isinstance(field.datatype, ArrayType):
        pandas_df[field.name] = pandas_df[field.name].apply(lambda x: json.loads(x) if x is not None else x)

Recomendações adicionais

SPRKPY1069

Mensagem: Se o parâmetro partitionBy for uma lista, o Snowpark lançará um erro.

Categoria: Aviso

Descrição

When there is a usage of pyspark.sql.readwriter.DataFrameWriter.parquet method where it comes to the parameter partitionBy, the tool shows the EWI.

This is because in Snowpark the DataFrameWriter.parquet only supports a ColumnOrSqlExpr as a partitionBy parameter.

Cenários

Cenário 1

Código de entrada:

Para esse cenário, o parâmetro partitionBy não é uma lista.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

df.write.parquet(file_path, partitionBy="age")

Código de saída:

The tool adds the EWI SPRKPY1069 to let you know that Snowpark throws an error if parameter is a list.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = "age", format_type_options = dict(compression = "None"))

Correção recomendada

There is not a recommended fix for this scenario because the tool always adds this EWI just in case the partitionBy parameter is a list. Remember that in Snowpark, only accepts cloud locations using a snowflake stage.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
Session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}').show()
Session.file.put(f"file:///path/to/data/file.parquet", f"@{stage}")

df.write.parquet(stage, partition_by = "age", format_type_options = dict(compression = "None"))
Cenário 2

Código de entrada:

Para esse cenário, o parâmetro partitionBy é uma lista.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

df.write.parquet(file_path, partitionBy=["age", "name"])

Código de saída:

The tool adds the EWI SPRKPY1069 to let you know that Snowpark throws an error if parameter is a list.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = ["age", "name"], format_type_options = dict(compression = "None"))

Correção recomendada

If the value of the parameter is a list, then replace it with a ColumnOrSqlExpr.

df.write.parquet(file_path, partition_by = sql_expr("age || name"), format_type_options = dict(compression = "None"))

Recomendações adicionais

SPRKPY1070

Message: The mode argument is transformed to overwrite, check the variable value and set the corresponding bool value.

Categoria: Aviso

Descrição

Quando há um uso de:

The tool analyzes the parameter mode to determinate if the value is overwrite.

Cenários

Cenário 1

Código de entrada

Para esse cenário, a ferramenta detecta que o parâmetro mode pode definir o valor bool correspondente.

df.write.csv(file_path, mode="overwrite")

Código de saída:

The SMA tool analyzes the mode parameter, determinate that the value is overwrite and set the corresponding bool value

df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)

Correção recomendada

Não há uma correção recomendada para esse cenário porque a ferramenta realizou a transformação correspondente.

Cenário 2:

Código de entrada

In this scenario the tool can not validate the value is overwrite.

df.write.csv(file_path, mode=myVal)

Código de saída:

O SMA adiciona uma mensagem EWI indicando que o parâmetro mode foi transformado em “overwrite”, mas também serve para que você saiba que é melhor verificar o valor da variável e definir o valor bool correto.

#EWI: SPRKPY1070 => The 'mode' argument is transformed to 'overwrite', check the variable value and set the corresponding bool value.
df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = myVal)

Correção recomendada

Check for the value of the parameter mode and add the correct value for the parameter overwrite.

df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)

Recomendações adicionais

SPRKPY1071

Mensagem: A função pyspark.rdd.RDD.getNumPartitions não é necessária no Snowpark. Portanto, você deve remover todas as referências.

Categoria: Aviso

Descrição

This issue appears when the tool finds the use of the pyspark.rdd.RDD.getNumPartitions function. Snowflake uses micro-partitioning mechanism, so the use of this function is not required.

Cenário

Entrada

O getNumPartitions retorna a quantidade de partições em um RDD.

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])

print(df.getNumPartitions())

Saída

A ferramenta adiciona esse EWI para que você saiba que o getNumPartitions não é necessário.

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])
#EWI: SPRKPY1071 => The getNumPartitions are not required in Snowpark. So, you should remove all references.

print(df.getNumPartitions())

Correção recomendada

Remover todos os usos dessa função.

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])

Recomendações adicionais

SPRKPY1072

Mensagem: O uso do StorageLevel não é obrigatório no Snowpark.

Categoria: Aviso.

Descrição

This issue appears when the tool finds the use of the StorageLevel class, which works like «flags» to set the storage level. Since Snowflake controls the storage, the use of this function is not required.

Recomendações adicionais

SPRKPY1073

Mensagem: pyspark.sql.functions.udf sem parâmetros ou parâmetro de tipo de retorno não são suportados

Categoria: Aviso.

Descrição

This issue appears when the tool detects the usage of pyspark.sql.functions.udf as function or decorator and is not supported in two specifics cases, when it has no parameters or return type parameter.

Cenários

Cenário 1

Entrada

No Pyspark, você pode criar uma Função Definida pelo Usuário sem parâmetros de entrada ou de tipo de retorno:

from pyspark.sql import SparkSession, DataFrameStatFunctions
from pyspark.sql.functions import col, udf

spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

my_udf = udf(lambda s: len(s))
df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

Saída

O Snowpark requer os tipos de entrada e retorno para a função Udf. Porque eles não são fornecidos e o SMA não pode usar esses parâmetros.

from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1073 => pyspark.sql.functions.udf function without the return type parameter is not supported. See documentation for more info.
my_udf = udf(lambda s: len(s))

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

Correção recomendada

To fix this scenario is required to add the import for the returns types of the input and output, and then the parameters of return*type and input_types[] on the udf function _my_udf*.

from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

my_udf = udf(lambda s: len(s), return_type=IntegerType(), input_types=[StringType()])

df.with_column("result", my_udf(df.Value)).show()
Cenário 2

No PySpark, você pode usar um decorador @udf sem parâmetros

Entrada

from pyspark.sql.functions import col, udf

spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

@udf()
def my_udf(str):
    return len(str)


df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

Saída

In Snowpark all the parameters of a udf decorator are required.

from snowflake.snowpark.functions import col, udf

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

#EWI: SPRKPY1073 => pyspark.sql.functions.udf decorator without parameters is not supported. See documentation for more info.

@udf()
def my_udf(str):
    return len(str)

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

Correção recomendada

To fix this scenario is required to add the import for the returns types of the input and output, and then the parameters of return_type and input_types[] on the udf @udf decorator.

from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

@udf(return_type=IntegerType(), input_types=[StringType()])
def my_udf(str):
    return len(str)

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

Recomendações adicionais

SPRKPY1074

Mensagem: O arquivo tem indentação mista (espaços e tabulações).

Categoria: Erro de análise.

Descrição

Esse problema aparece quando a ferramenta detecta que o arquivo tem um recuo misto. Isso significa que o arquivo tem uma combinação de espaços e tabulações para recuar as linhas de código.

Cenário

Entrada

No Pyspark, você pode misturar espaços e tabulações para o nível de identificação.

def foo():
    x = 5 # spaces
    y = 6 # tab

Saída

O SMA não pode lidar com marcadores de recuo mistos. Quando isso é detectado em um arquivo de código python, o SMA adiciona o EWI SPRKPY1074 na primeira linha.

## EWI: SPRKPY1074 => File has mixed indentation (spaces and tabs).
## This file was not converted, so it is expected to still have references to the Spark API
def foo():
    x = 5 # spaces
    y = 6 # tabs

Correção recomendada

A solução é fazer com que todos os símbolos de recuo sejam iguais.

def foo():
  x = 5 # tab
  y = 6 # tab

Recomendações adicionais

SPRKPY1075

Categoria

Aviso.

Descrição

O parse_json não aplica validação de esquema; se precisar filtrar/validar com base no esquema, talvez seja necessário introduzir alguma lógica.

Exemplo

Entrada

df.select(from_json(df.value, Schema))
df.select(from_json(schema=Schema, col=df.value))
df.select(from_json(df.value, Schema, option))

Saída

#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))

Para a função from_json, o esquema não é realmente passado para inferência, mas é usado para validação. Veja estes exemplos:

data = [
    ('{"name": "John", "age": 30, "city": "New York"}',),
    ('{"name": "Jane", "age": "25", "city": "San Francisco"}',)
]

df = spark.createDataFrame(data, ["json_str"])

Exemplo 1: Aplicar tipos de dados e alterar nomes de colunas:

## Parse JSON column with schema
parsed_df = df.withColumn("parsed_json", from_json(col("json_str"), schema))

parsed_df.show(truncate=False)

## +------------------------------------------------------+---------------------------+
## |json_str                                              |parsed_json                |
## +------------------------------------------------------+---------------------------+
## |{"name": "John", "age": 30, "city": "New York"}       |{John, 30, New York}       |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, null, San Francisco}|
## +------------------------------------------------------+---------------------------+
## notice that values outside of the schema were dropped and columns not matched are returned as null

Exemplo 2: Selecionar colunas específicas:

## Define a schema with only the columns we want to use
partial_schema = StructType([
    StructField("name", StringType(), True),
    StructField("city", StringType(), True)
])

## Parse JSON column with partial schema
partial_df = df.withColumn("parsed_json", from_json(col("json_str"), partial_schema))

partial_df.show(truncate=False)

## +------------------------------------------------------+---------------------+
## |json_str                                              |parsed_json          |
## +------------------------------------------------------+---------------------+
## |{"name": "John", "age": 30, "city": "New York"}       |{John, New York}     |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, San Francisco}|
## +------------------------------------------------------+---------------------+
## there is also an automatic filtering

Recomendações

  • For more support, you can email us at sma-support@snowflake.com. If you have a contract for support with Snowflake, reach out to your sales engineer and they can direct your support needs.

  • Useful tools PEP-8 and Reindent.

SPRKPY1076

Message: Parameters in pyspark.sql.readwriter.DataFrameReader methods are not supported. This applies to CSV, JSON and PARQUET methods.

Categoria: Aviso.

Descrição

For the CSV, JSON and PARQUET methods on the pyspark.sql.readwriter.DataFrameReader object, the tool will analyze the parameters and add a transformation according to each case:

  • Todos os parâmetros correspondem ao seu nome equivalente no Snowpark: nesse caso, a ferramenta transformará o parâmetro em uma chamada .option(). Nesse caso, o parâmetro não adicionará esse EWI.

  • Alguns parâmetros não correspondem ao equivalente no Snowpark: nesse caso, a ferramenta adicionará esse EWI com as informações do parâmetro e o removerá da chamada do método.

Lista de equivalências:

  • Equivalências para CSV:

Chaves do Spark

Equivalências do Snowpark

sep

FIELD_DELIMITER

header

PARSE_HEADER

lineSep

RECORD_DELIMITER

pathGlobFilter

PATTERN

quote

FIELD_OPTIONALLY_ENCLOSED_BY

nullValue

NULL_IF

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

inferSchema

INFER_SCHEMA

delimiter

FIELD_DELIMITER

  • Equivalências para JSON:

Chaves do Spark

Equivalências do Snowpark

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

pathGlobFilter

PATTERN

  • Equivalências para PARQUET:

Chaves do Spark

Equivalências do Snowpark

pathGlobFilter

PATTERN

Cenários

Cenário 1

Entrada

Para CVS, aqui estão alguns exemplos:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('myapp').getOrCreate()

spark.read.csv("path3", None,None,None,None,None,None,True).show()

Saída

No código convertido, os parâmetros são adicionados como opções individuais à função cvs

from snowflake.snowpark import Session

spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the csv function, the supported ones will be added into a option method.
spark.read.option("FIELD_DELIMITER", None).option("PARSE_HEADER", True).option("FIELD_OPTIONALLY_ENCLOSED_BY", None).csv("path3").show()

Cenário 2

Entrada

Para JSON, aqui estão alguns exemplos:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()
spark.read.json("/myPath/jsonFile/", dateFormat='YYYY/MM/DD').show()

Saída

No código convertido, os parâmetros são adicionados como opções individuais à função json

from snowflake.snowpark import Session
spark = Session.builder.app_name('myapp', True).getOrCreate()
#EWI: SPRKPY1076 => Some of the included parameters are not supported in the json function, the supported ones will be added into a option method.

spark.read.option("DATE_FORMAT", 'YYYY/MM/DD').json("/myPath/jsonFile/").show()
Cenário 3

Entrada

Para PARQUET, aqui estão alguns exemplos:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()

spark.read.parquet("/path/to/my/file.parquet", pathGlobFilter="*.parquet").show()

Saída

No código convertido, os parâmetros são adicionados como opções individuais à função parquet

from snowflake.snowpark import Session

spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => The parquet function require adjustments, in Snowpark the parquet files needs to be located in an stage. See the documentation for more info.

spark.read.option("PATTERN", "*.parquet").parquet("/path/to/my/file.parquet")

Recomendações adicionais

SPRKPY1077

Mensagem: o código SQL incorporado não pode ser processado.

Categoria: Aviso.

Descrição

Esse problema aparece quando a ferramenta detecta um código SQL incorporado que não pode ser convertido para o Snowpark.

Consulte a seção de código SQL incorporado para obter mais informações.

Cenário

Entrada

Neste exemplo, o código SQL está incorporado em uma variável chamada query, que é usada como parâmetro para o método Pyspark.sql.

query = f"SELECT * from myTable"
spark.sql(query)

Saída

O SMA detecta que o parâmetro PySpark.sql é uma variável e não um código SQL, portanto, a mensagem EWI SPRKPY1077 é adicionada à linha PySpark.sql.

query = f"SELECT * myTable"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
spark.sql(query)

Recomendações adicionais

  • Para a transformação de SQL, esse código deve estar diretamente dentro como parâmetro do método apenas como valores de cadeia de caracteres e sem interpolação. Verifique o envio de SQL para a função PySpark.SQL para validar sua funcionalidade no Snowflake.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1078

Mensagem: O argumento da função pyspark.context.SparkContext.setLogLevel não é um valor literal e, portanto, não pôde ser avaliado

Categoria: Aviso

Descrição

This issue appears when the SMA detects the use of the pyspark.context.SparkContext.setLogLevel function with an argument that is not a literal value, for example, when the argument is a variable.

O SMA faz uma análise estática do seu código-fonte e, portanto, não é possível avaliar o conteúdo desse argumento e determinar um equivalente no Snowpark.

Cenário

Entrada

Neste exemplo, o logLevel é definido na variável my_log_level e, em seguida, my_log_level é usado como parâmetro pelo método setLogLevel.

my_log_level = "WARN"
sparkSession.sparkContext.setLogLevel(my_log_level)

Saída

O SMA não consegue avaliar o argumento do parâmetro de nível de registro, portanto, o EWI SPRKPY1078 é adicionado sobre a linha do registro transformado:

my_log_level = "WARN"
#EWI: SPRKPY1078 => my_log_level is not a literal value and therefore could not be evaluated. Make sure the value of my_log_level is a valid level in Snowpark. Valid log levels are: logging.CRITICAL, logging.DEBUG, logging.ERROR, logging.INFO, logging.NOTSET, logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)

Correção recomendada

Even though the SMA was unable to evaluate the argument, it will transform the pyspark.context.SparkContext.setLogLevel function into the Snowpark equivalent. Please make sure the value of the level argument in the generated output code is a valid and equivalent log level in Snowpark according to the table below:

Nível de registro do PySpark

Nível de registro do Snowpark equivalente

ALL

logging.NOTSET

DEBUG

logging.DEBUG

ERROR

logging.ERROR

FATAL

logging.CRITICAL

INFO

logging.INFO

OFF

logging.WARNING

TRACE

logging.NOTSET

WARN

logging.WARNING

Assim, a correção recomendada será semelhante:

my_log_level = logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)

Recomendações adicionais

SPRKPY1079

Mensagem: O argumento da função pyspark.context.SparkContext.setLogLevel não é um nível de registro PySpark válido

Categoria: Aviso

Descrição

This issue appears when the SMA detects the use of the pyspark.context.SparkContext.setLogLevel function with an argument that is not a valid log level in PySpark, and therefore an equivalent could not be determined in Snowpark.

Cenário

Entrada

aqui o nível de registro usa «INVALID_LOG_LEVEL», que não é um nível de registro válido do Pyspark.

sparkSession.sparkContext.setLogLevel("INVALID_LOG_LEVEL")

Saída

O SMA não consegue reconhecer o nível de registro «INVALID_LOG_LEVEL», embora o SMA faça a conversão, o EWI SPRKPY1079 é adicionado para indicar um possível problema.

#EWI: SPRKPY1079 => INVALID_LOG_LEVEL is not a valid PySpark log level, therefore an equivalent could not be determined in Snowpark. Valid PySpark log levels are: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
logging.basicConfig(stream = sys.stdout, level = logging.INVALID_LOG_LEVEL)

Correção recomendada

Make sure that the log level used in the pyspark.context.SparkContext.setLogLevel function is a valid log level in PySpark or in Snowpark and try again.

logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)

Recomendações adicionais

SPRKPY1081

This issue code has been deprecated since Spark Conversion Core 4.12.0

Mensagem: pyspark.sql.readwriter.DataFrameWriter.partitionBy tem uma solução alternativa.

Categoria: Aviso

Descrição

The Pyspark.sql.readwriter.DataFrameWriter.partitionBy function is not supported. The workaround is to use Snowpark’s copy_into_location instead. See the documentation for more info.

Cenário

Entrada

This code will create a separate directories for each unique value in the FIRST_NAME column. The data is the same, but it’s going to be stored in different directories based on the column.

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")

This code will create a separate directories for each unique value in the FIRST_NAME column. The data is the same, but it’s going to be stored in different directories based on the column.

Código de saída

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
#EWI: SPRKPY1081 => The partitionBy function is not supported, but you can instead use copy_into_location as workaround. See the documentation for more info.
df.write.partitionBy("FIRST_NAME").csv("/home/data", format_type_options = dict(compression = "None"))

Correção recomendada

In Snowpark, copy_into_location has a partition_by parameter that you can use instead of the partitionBy function, but it’s going to require some manual adjustments, as shown in the following example:

Código do Spark:

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")

O código do Snowpark foi ajustado manualmente:

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.copy_into_location(location=temp_stage, partition_by=col("FIRST_NAME"), file_format_type="csv", format_type_options={"COMPRESSION": "NONE"}, header=True)

copy_into_location tem os seguintes parâmetros

  • location: The Snowpark location only accepts cloud locations using an snowflake stage.

  • _partition_by_: Pode ser um nome de coluna ou uma expressão SQL, portanto, você precisará convertê-la em uma coluna ou em uma expressão SQL, usando col ou sql_expr.

Recomendações adicionais

SPRKPY1082

Mensagem: A função pyspark.sql.readwriter.DataFrameReader.load não é suportada. Uma solução alternativa é usar o método específico do formato do Snowpark DataFrameReader (avro csv, json, orc, parquet). O parâmetro path deve ser um local de estágio.

Categoria: Aviso

Descrição

The pyspark.sql.readwriter.DataFrameReader.load function is not supported. The workaround is to use Snowpark DataFrameReader methods instead.

Cenários

The spark signature for this method DataFrameReader.load(path, format, schema, **options) does not exist in Snowpark. Therefore, any usage of the load function is going to have an EWI in the output code.

Cenário 1

Entrada

Below is an example that tries to load data from a CSV source.

path_csv_file = "/path/to/file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_csv_file, "csv").show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()

Saída

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_csv_file = "/path/to/file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_csv_file, "csv").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
#EWI: The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()

Correção recomendada

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with csv method.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the CSV method.

path_csv_file = "/path/to/file.csv"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.csv(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).csv(temp_stage)
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case lineSep and dateFormat are replaced with RECORD_DELIMITER and DATE_FORMAT, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with RECORD_DELIMITER and DATE_FORMAT, and calls the options method with that dictionary.

optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}
df = my_session.read.options(optionsParam).csv(stage)

Cenário 2

Entrada

Below is an example that tries to load data from a JSON source.

path_json_file = "/path/to/file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_json_file, "json").show()
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()

Saída

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_json_file = "/path/to/file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_json_file, "json").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()

Correção recomendada

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with json method

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the JSON method.

path_json_file = "/path/to/file.json"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.json(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).json(temp_stage)
  • Fixing options parameter:

The options between Spark and snowpark are not the same, in this case dateFormat and timestampFormat are replaced with DATE_FORMAT and TIMESTAMP_FORMAT, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with DATE_FORMAT and TIMESTAMP_FORMAT, and calls the options method with that dictionary.

optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}
df = Session.read.options(optionsParam).json(stage)

Cenário 3

Entrada

Below is an example that tries to load data from a PARQUET source.

path_parquet_file = "/path/to/file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_parquet_file, "parquet").show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()

Saída

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_parquet_file = "/path/to/file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_parquet_file, "parquet").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()

Correção recomendada

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with parquet method

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the PARQUET method.

path_parquet_file = "/path/to/file.parquet"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.parquet(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).parquet(temp_stage)
  • Fixing options parameter:

The options between Spark and snowpark are not the same, in this case pathGlobFilter is replaced with PATTERN, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with PATTERN, and calls the options method with that dictionary.

optionsParam = {"PATTERN": "*.parquet"}
df = Session.read.options(optionsParam).parquet(stage)

Recomendações adicionais

  • Leve em conta que as opções entre o spark e o snowpark não são as mesmas, mas podem ser mapeadas:

Opções do Spark

Valor possível

Equivalente do Snowpark

Descrição

header

Verdadeiro ou falso

SKIP_HEADER = 1 / SKIP_HEADER = 0

Para usar a primeira linha de um arquivo como nomes de colunas.

delimiter

Qualquer separador de campo de um ou vários caracteres

FIELD_DELIMITER

Para especificar caractere(s) único(s)/múltiplo(s) como separador para cada coluna/campo.

sep

Qualquer separador de campo de caractere único

FIELD_DELIMITER

Para especificar um único caractere como separador para cada coluna/campo.

encoding

UTF-8, UTF-16, etc…

ENCODING

Para decodificar os arquivos CSV pelo tipo de codificação fornecido. A codificação padrão é UTF-8

lineSep

Qualquer separador de linha de caractere único

RECORD_DELIMITER

Para definir o separador de linha que deve ser usado na análise de arquivos.

pathGlobFilter

Padrão de arquivo

PATTERN

Para definir um padrão para ler arquivos somente com nomes de arquivos que correspondam ao padrão.

recursiveFileLookup

Verdadeiro ou falso

N/A

Para examinar recursivamente um diretório para ler arquivos. O valor padrão dessa opção é False.

quote

Caractere único a ser citado

FIELD_OPTIONALLY_ENCLOSED_BY

Para citar campos/colunas que contêm campos em que o delimitador/separador pode fazer parte do valor. Esse caractere Para citar todos os campos quando usado com a opção quoteAll. O valor padrão dessa opção é aspas duplas («).

nullValue

Cadeia de caracteres para substituir null

NULL_IF

Para substituir os valores nulos pela cadeia de caracteres durante a leitura e gravação do dataframe.

dateFormat

Formato de data válido

DATE_FORMAT

Para definir uma cadeia de caracteres que indica um formato de data. O formato padrão é yyyy-MM-dd.

timestampFormat

Formato de carimbo de data/hora válido

TIMESTAMP_FORMAT

Para definir uma cadeia de caracteres que indica um formato de carimbo de data/hora. O formato padrão é yyyy-MM-dd “T’HH:mm:ss.

escape

Qualquer caractere único

ESCAPE

Para definir um único caractere como caractere de escape para substituir o caractere de escape padrão (\).

inferSchema

Verdadeiro ou falso

INFER_SCHEMA

Detecta automaticamente o esquema do arquivo

mergeSchema

Verdadeiro ou falso

N/A

Não é necessário no snowflake, pois isso acontece sempre que o infer_schema determina a estrutura do arquivo parquet

  • For modifiedBefore / modifiedAfter option you can achieve the same result in Snowflake by using the metadata columns and then adding a filter like: df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’).

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1083

Mensagem: A função pyspark.sql.readwriter.DataFrameWriter.save não é suportada. Uma solução alternativa é usar o método copy_into_location do Snowpark DataFrameWriter.

Categoria: Aviso

Descrição

The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. The workaround is to use Snowpark DataFrameWriter methods instead.

Cenários

The spark signature for this method DataFrameWriter.save(path, format, mode, partitionBy, **options) does not exists in Snowpark. Therefore, any usage of the load function it’s going to have an EWI in the output code.

Cenário 1

Código de entrada

Below is an example that tries to save data with CSV format.

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_csv_file, format="csv")
df.write.save(path_csv_file, format="csv", mode="overwrite")
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")

Código de saída

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")

Correção recomendada

As a workaround you can use Snowpark DataFrameWriter methods instead.

  • Fixing path and format parameters:

    • Replace the load method with csv or copy_into_location method.

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Abaixo está um exemplo que cria um estágio temporal e coloca o arquivo nele, depois chama um dos métodos mencionados acima.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"

## Using csv method
df.write.csv(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="csv")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using csv method
df.write.mode("overwrite").csv(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="csv")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the CSV method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using csv method
df.write.csv(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="csv", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case lineSep and dateFormat are replaced with RECORD_DELIMITER and DATE_FORMAT, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with RECORD_DELIMITER and DATE_FORMAT, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}

## Using csv method
df.write.csv(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.csv(stage, file_format_type="csv", format_type_options=optionsParam)

Cenário 2

Código de entrada

Below is an example that tries to save data with JSON format.

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_json_file, format="json")
df.write.save(path_json_file, format="json", mode="overwrite")
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")

Código de saída

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")

Correção recomendada

As a workaround you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with json or copy_into_location method

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Abaixo está um exemplo que cria um estágio temporal e coloca o arquivo nele, depois chama um dos métodos mencionados acima.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"

## Using json method
df.write.json(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="json")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using json method
df.write.mode("overwrite").json(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="json")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the CSV method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using json method
df.write.json(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="json", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case dateFormat and timestampFormat are replaced with DATE_FORMAT and TIMESTAMP_FORMAT, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with DATE_FORMAT and TIMESTAMP_FORMAT, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}

## Using json method
df.write.json(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="json", format_type_options=optionsParam)

Cenário 3

Código de entrada

Below is an example that tries to save data with PARQUET format.

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_parquet_file, format="parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")

Código de saída

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")

Correção recomendada

As a workaround you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with parquet or copy_into_location method.

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Abaixo está um exemplo que cria um estágio temporal e coloca o arquivo nele, depois chama um dos métodos mencionados acima.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"

## Using parquet method
df.write.parquet(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using parquet method
df.write.mode("overwrite").parquet(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(stage, file_format_type="parquet")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the parquet method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using parquet method
df.write.parquet(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case pathGlobFilter is replaced with PATTERN, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with PATTERN, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"PATTERN": "*.parquet"}

## Using parquet method
df.write.parquet(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", format_type_options=optionsParam)

Recomendações adicionais

  • Leve em conta que as opções entre o spark e o snowpark não são as mesmas, mas podem ser mapeadas:

Opções do Spark

Valor possível

Equivalente do Snowpark

Descrição

header

Verdadeiro ou falso

SKIP_HEADER = 1 / SKIP_HEADER = 0

Para usar a primeira linha de um arquivo como nomes de colunas.

delimiter

Qualquer separador de campo de um ou vários caracteres

FIELD_DELIMITER

Para especificar caractere(s) único(s)/múltiplo(s) como separador para cada coluna/campo.

sep

Qualquer separador de campo de caractere único

FIELD_DELIMITER

Para especificar um único caractere como separador para cada coluna/campo.

encoding

UTF-8, UTF-16, etc…

ENCODING

Para decodificar os arquivos CSV pelo tipo de codificação fornecido. A codificação padrão é UTF-8

lineSep

Qualquer separador de linha de caractere único

RECORD_DELIMITER

Para definir o separador de linha que deve ser usado na análise de arquivos.

pathGlobFilter

Padrão de arquivo

PATTERN

Para definir um padrão para ler arquivos somente com nomes de arquivos que correspondam ao padrão.

recursiveFileLookup

Verdadeiro ou falso

N/A

Para examinar recursivamente um diretório para ler arquivos. O valor padrão dessa opção é False.

quote

Caractere único a ser citado

FIELD_OPTIONALLY_ENCLOSED_BY

Para citar campos/colunas que contêm campos em que o delimitador/separador pode fazer parte do valor. Esse caractere Para citar todos os campos quando usado com a opção quoteAll. O valor padrão dessa opção é aspas duplas («).

nullValue

Cadeia de caracteres para substituir null

NULL_IF

Para substituir os valores nulos pela cadeia de caracteres durante a leitura e gravação do dataframe.

dateFormat

Formato de data válido

DATE_FORMAT

Para definir uma cadeia de caracteres que indica um formato de data. O formato padrão é yyyy-MM-dd.

timestampFormat

Formato de carimbo de data/hora válido

TIMESTAMP_FORMAT

Para definir uma cadeia de caracteres que indica um formato de carimbo de data/hora. O formato padrão é yyyy-MM-dd “T’HH:mm:ss.

escape

Qualquer caractere único

ESCAPE

Para definir um único caractere como caractere de escape para substituir o caractere de escape padrão (\).

inferSchema

Verdadeiro ou falso

INFER_SCHEMA

Detecta automaticamente o esquema do arquivo

mergeSchema

Verdadeiro ou falso

N/A

Não é necessário no snowflake, pois isso acontece sempre que o infer_schema determina a estrutura do arquivo parquet

  • For modifiedBefore / modifiedAfter option you can achieve the same result in Snowflake by using the metadata columns and then add a filter like: df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’).

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1084

This issue code has been deprecated since Spark Conversion Core 4.12.0

Mensagem: pyspark.sql.readwriter.DataFrameWriter.option não é compatível.

Categoria: Aviso

Descrição

The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.

Cenário

Código de entrada

Below is an example using the option method, this method is used to add additional configurations when writing the data of a DataFrame.

path_csv_file = "/path/to/file.csv"
data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.option("header", True).csv(csv_file_path)
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)

Código de saída

The tool adds this EWI SPRKPY1084 on the output code to let you know that this function is not supported by Snowpark.

path_csv_file = "/path/to/file.csv"
data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.

df.write.option("header", True).csv(csv_file_path)
#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)

Correção recomendada

O método pyspark.sql.readwriter.DataFrameWriter.option não tem uma correção recomendada.

Recomendações adicionais

SPRKPY1085

Mensagem: pyspark.ml.feature.VectorAssembler não é compatível.

Categoria: Aviso

Descrição

The pyspark.ml.feature.VectorAssembler is not supported.

Cenário

Código de entrada

VectorAssembler é usado para combinar várias colunas em um único vetor.

data = [
        (1, 10.0, 20.0),
        (2, 25.0, 30.0),
        (3, 50.0, 60.0)
    ]

df = SparkSession.createDataFrame(data, schema=["Id", "col1", "col2"])
vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")

Código de saída

The tool adds this EWI SPRKPY1085 on the output code to let you know that this class is not supported by Snowpark.

data = [
        (1, 10.0, 20.0),
        (2, 25.0, 30.0),
        (3, 50.0, 60.0)
    ]

df = spark.createDataFrame(data, schema=["Id", "col1", "col2"])
#EWI: SPRKPY1085 => The pyspark.ml.feature.VectorAssembler function is not supported.

vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")

Correção recomendada

O pyspark.ml.feature.VectorAssembler não tem uma correção recomendada.

Recomendações adicionais

SPRKPY1086

Mensagem: pyspark.ml.linalg.VectorUDT não é compatível.

Categoria: Aviso

Descrição

The pyspark.ml.linalg.VectorUDT is not supported.

Cenário

Código de entrada

VectorUDT é um tipo de dados para representar colunas de vetores em um DataFrame.

data = [
        (1, Vectors.dense([10.0, 20.0])),
        (2, Vectors.dense([25.0, 30.0])),
        (3, Vectors.dense([50.0, 60.0]))
    ]

schema = StructType([
        StructField("Id", IntegerType(), True),
        StructField("VectorCol", VectorUDT(), True),
    ])

df = SparkSession.createDataFrame(data, schema=schema)

Código de saída

The tool adds this EWI SPRKPY1086 on the output code to let you know that this function is not supported by Snowpark.

data = [
        (1, Vectors.dense([10.0, 20.0])),
        (2, Vectors.dense([25.0, 30.0])),
        (3, Vectors.dense([50.0, 60.0]))
    ]

#EWI: SPRKPY1086 => The pyspark.ml.linalg.VectorUDT function is not supported.
schema = StructType([
        StructField("Id", IntegerType(), True),
        StructField("VectorCol", VectorUDT(), True),
    ])

df = spark.createDataFrame(data, schema=schema)

Correção recomendada

O pyspark.ml.linalg.VectorUDT não tem uma correção recomendada.

Recomendações adicionais

SPRKPY1087

Mensagem: A função pyspark.sql.dataframe.DataFrame.writeTo não é suportada, mas tem uma solução alternativa.

Categoria: Aviso.

Descrição

The pyspark.sql.dataframe.DataFrame.writeTo function is not supported. The workaround is to use Snowpark DataFrameWriter SaveAsTable method instead.

Cenário

Entrada

Below is an example of a use of the pyspark.sql.dataframe.DataFrame.writeTo function, the dataframe df is written into a table name Personal_info.

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

df.writeTo("Personal_info")

Saída

The SMA adds the EWI SPRKPY1087 to the output code to let you know that this function is not supported, but has a workaround.

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

#EWI: SPRKPY1087 => pyspark.sql.dataframe.DataFrame.writeTo is not supported, but it has a workaround.
df.writeTo("Personal_info")

Correção recomendada

A solução alternativa é usar o método do Snowpark DataFrameWriter SaveAsTable.

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

df.write.saveAsTable("Personal_info")

Recomendações adicionais

SPRKPY1088

Mensagem: Os valores de pyspark.sql.readwriter.DataFrameWriter.option no Snowpark podem ser diferentes, portanto, pode ser necessária uma validação.

Categoria: Aviso

Descrição

The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so validation might be needed to ensure that the behavior is correct.

Cenários

Há alguns cenários, dependendo da opção ser suportada ou não, ou do formato usado para gravar o arquivo.

Cenário 1

Entrada

Below is an example of the usage of the method option, adding a sep option, which is currently supported.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("sep", ",").csv("some_path")

Saída

The tool adds the EWI SPRKPY1088 indicating that it is required validation.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")

Correção recomendada

O Snowpark API oferece suporte a esse parâmetro, portanto, a única ação pode ser verificar o comportamento após a migração. Consulte a tabela Equivalências para ver os parâmetros compatíveis.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")
Cenário 2

Entrada

Here the scenario shows the usage of option, but adds a header option, which is not supported.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("header", True).csv("some_path")

Saída

The tool adds the EWI SPRKPY1088 indicating that it is required validation is needed.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("header", True).csv("some_path")

Correção recomendada

For this scenario it is recommended to evaluate the Snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
Cenário 3

Entrada

This scenario adds a sep option, which is supported and uses the JSON method.

  • Note: this scenario also applies for PARQUET.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("sep", ",").json("some_path")

Saída

The tool adds the EWI SPRKPY1088 indicating that it is required validation is needed.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").json("some_path")

Correção recomendada

The file format JSON does not support the parameter sep, so it is recommended to evaluate the snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")

Recomendações adicionais

  • Since there are some not supported parameters, it is recommended to check the table of equivalences and check the behavior after the transformation.

  • Tabela de equivalências:

Opção PySpark

Opção SnowFlake

Formatos de arquivo suportados

Descrição

SEP

FIELD_DELIMITER

CSV

Um ou mais caracteres de byte único ou de vários bytes que separam os campos em um arquivo de entrada.

LINESEP

RECORD_DELIMITER

CSV

Um ou mais caracteres que separam registros em um arquivo de entrada.

QUOTE

FIELD_OPTIONALLY_ENCLOSED_BY

CSV

Caractere usado para delimitar as cadeias de caracteres.

NULLVALUE

NULL_IF

CSV

String usada para converter de e para SQL NULL.

DATEFORMAT

DATE_FORMAT

CSV

String que define o formato dos valores de data nos arquivos de dados a serem carregados.

TIMESTAMPFORMAT

TIMESTAMP_FORMAT

CSV

String que define o formato dos valores de carimbo de data/hora nos arquivos de dados a serem carregados.

Se o parâmetro usado não estiver na lista, a API gera um erro.

SPRKPY1089

Mensagem: Os valores de pyspark.sql.readwriter.DataFrameWriter.options no Snowpark podem ser diferentes, portanto, pode ser necessária uma validação.

Categoria: Aviso

Descrição

The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so validation might be needed to ensure that the behavior is correct.

Cenários

Há alguns cenários, dependendo do fato de as opções terem suporte ou não, ou do formato usado para gravar o arquivo.

Cenário 1

Entrada

Below is an example of the usage of the method options, adding the options sep and nullValue, which are currently supported.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(nullValue="myVal", sep=",").csv("some_path")

Saída

The tool adds the EWI SPRKPY1089 indicating that it is required validation.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")

Correção recomendada

O Snowpark API é compatível com esses parâmetros, portanto, a única ação a ser tomada é verificar o comportamento após a migração. Consulte a tabela Equivalências para ver os parâmetros compatíveis.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")
Cenário 2

Entrada

Here the scenario shows the usage of options, but adds a header option, which is not supported.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(header=True, sep=",").csv("some_path")

Saída

The tool adds the EWI SPRKPY1089 indicating that it is required validation is needed.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(header=True, sep=",").csv("some_path")

Correção recomendada

For this scenario it is recommended to evaluate the Snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
Cenário 3

Entrada

This scenario adds a sep option, which is supported and uses the JSON method.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(nullValue="myVal", sep=",").json("some_path")

Saída

The tool adds the EWI SPRKPY1089 indicating that it is required validation is needed.

  • Note: this scenario also applies for PARQUET.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").json("some_path")

Correção recomendada

The file format JSON does not support the parameter sep, so it is recommended to evaluate the snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")

Recomendações adicionais

  • Since there are some not supported parameters, it is recommended to check the table of equivalences and check the behavior after the transformation.

  • Tabela de equivalências:

O Snowpark pode oferecer suporte a uma lista de equivalências para alguns parâmetros:

Opção PySpark

Opção SnowFlake

Formatos de arquivo suportados

Descrição

SEP

FIELD_DELIMITER

CSV

Um ou mais caracteres de byte único ou de vários bytes que separam os campos em um arquivo de entrada.

LINESEP

RECORD_DELIMITER

CSV

Um ou mais caracteres que separam registros em um arquivo de entrada.

QUOTE

FIELD_OPTIONALLY_ENCLOSED_BY

CSV

Caractere usado para delimitar as cadeias de caracteres.

NULLVALUE

NULL_IF

CSV

String usada para converter de e para SQL NULL.

DATEFORMAT

DATE_FORMAT

CSV

String que define o formato dos valores de data nos arquivos de dados a serem carregados.

TIMESTAMPFORMAT

TIMESTAMP_FORMAT

CSV

String que define o formato dos valores de carimbo de data/hora nos arquivos de dados a serem carregados.

Se o parâmetro usado não estiver na lista, a API gera um erro.

SPRKPY1101

Categoria

Erro de análise.

Descrição

Quando a ferramenta reconhece um erro de análise, ela tenta se recuperar dele e continua o processo na próxima linha. Nesses casos, ele mostra o erro e os comentários sobre a linha.

Este exemplo mostra como é tratado um erro de incompatibilidade entre espaços e tabulações.

Código de entrada

def foo():
    x = 5 # Spaces
     y = 6 # Tab

def foo2():
    x=6
    y=7

Código de saída

def foo():
    x = 5 # Spaces
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(3, 2). Last valid token was '5' @(2, 9), failed token 'y' @(3, 2)
## y = 6 # Tab

def foo2():
    x=6
    y=7

Recomendações

  • Tente corrigir a linha comentada.

  • For more support, email us at sma-support@snowflake.com. If you have a support contract with Snowflake, reach out to your sales engineer, who can direct your support needs.