Guia de compatibilidade do Snowpark Connect for Spark¶
Este guia documenta a compatibilidade entre a implementação do Snowpark Connect for Spark de Spark DataFrame APIs e o Apache Spark nativo. O objetivo é ajudar os usuários a entender as principais diferenças, os recursos não suportados e as considerações de migração ao mover cargas de trabalho do Spark para. Snowpark Connect for Spark.
Snowpark Connect for Spark visa fornecer uma experiência familiar do Spark DataFrame API sobre o mecanismo de execução do Snowflake. Entretanto, há lacunas de compatibilidade descritas neste tópico. Este guia destaca essas diferenças para ajudar você a planejar e adaptar sua migração. Esses problemas podem ser abordados em um lançamento futuro.
DataTypes¶
Tipos de dados sem suporte¶
Conversão implícita do tipo de dados¶
Ao usar Snowpark Connect for Spark, tenha em mente como os tipos de dados são tratados. Snowpark Connect for Spark representa implicitamente ByteType, ShortType`e :code:`IntegerType como LongType. Isso significa que, embora você possa definir colunas ou dados com ByteType, ShortType`ou :code:`IntegerType, os dados serão representados e retornados por Snowpark Connect for Spark como LongType. Da mesma forma, a conversão implícita também pode ocorrer para FloatType e DoubleType dependendo das operações e do contexto específicos. O mecanismo de execução do Snowflake manipulará internamente a compactação do tipo de dados e poderá, de fato, armazenar os dados como Byte ou Short, mas esses são considerados detalhes de implementação e não são expostos ao usuário final.
Semanticamente, esta representação não afetará a exatidão de suas consultas do Spark.
Tipo de dados de PySpark nativo |
Tipo de dados de Snowpark Connect for Spark |
|---|---|
|
|
|
|
|
|
|
|
O exemplo a seguir mostra uma diferença entre como o Spark e o Snowpark Connect for Spark manipulam tipos de dados em resultados de consulta.
Consulta¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
Nuance de NullType¶
Snowpark Connect for Spark não oferece suporte ao tipo de dados NullType, que é um tipo de dados compatível no Spark. Isso causa mudanças de comportamento ao usar Null ou None em dataframes.
No Spark, um NULL literal (por exemplo, com lit(None)) é automaticamente inferido como um NullType. Em Snowpark Connect for Spark, ele é inferido como um StringType durante a inferência do esquema.
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
Tipos de dados estruturados em ArrayType, MapType e ObjectType¶
Embora o suporte a tipos estruturados não esteja disponível por padrão em Snowpark Connect for Spark, os tipos de dados ARRAY, MAP e Object são tratados como coleções genéricas e sem tipo. Isso significa que não há aplicação de tipos de elementos, nomes de campos, esquema ou nulidade, ao contrário do que seria fornecido pelo suporte a tipos estruturados.
Se você tem uma dependência desse suporte, trabalhe com sua equipe de conta para habilitar esse recurso para sua conta.
Spark APIs sem suporte¶
A seguir estão as APIs suportadas pelo Spark clássico e Spark Connect, mas não suportadas no Snowpark Connect for Spark.
Dataframe.hint: Snowpark Connect for Spark ignora qualquer dica definida em um dataframe. O otimizador de consultas do Snowflake determina automaticamente a estratégia de execução mais eficiente.
DataFrame.repartition: Esta é uma operação não permitida em Snowpark Connect for Spark. O Snowflake gerencia automaticamente a distribuição e o particionamento de dados em sua infraestrutura de computação distribuída.
pyspark.RDD: RDD APIs não são compatíveis com o Spark Connect (incluindo Snowpark Connect for Spark).
Diferenças de UDF¶
Diferenças de StructType¶
Quando o Spark converte um StructType a ser usado em uma função definida pelo usuário (UDF), ele o converte em um tipo tuple em Python. Snowpark Connect for Spark converterá um StructType em um tipo dict em Python. Isso tem diferenças fundamentais no acesso e na saída do elemento.
O Spark acessará índices com 0, 1, 2, 3 e assim por diante.
Snowpark Connect for Spark acessará índices usando “_1”, “_2” e assim por diante.
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
Tipo de iterador em UDFs¶
O iterador não é compatível como tipo de retorno ou como tipo de entrada.
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Importing files to a Python UDF¶
With Snowpark Connect for Spark, you can specify external libraries and files in Python UDFs. Snowflake includes Python files and archives in your code’s execution context. You can import functions from these included files in a UDF without additional steps. This dependency-handling behavior works as described in Como criar uma UDF Python com código carregado de um estágio.
To include external libraries and files, you provide stage paths to the files as the value of the configuration setting
snowpark.connect.udf.imports. The configuration value should be an array of stage paths to the files, where the paths are
separated by commas.
Code in the following example includes two files in the UDF’s execution context. The UDF imports functions from these files and uses them in its logic.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")
@udf(returnType = StringType())
def import_example(input: str) -> str:
from library import first_function
from other_lib.custom import second_function
return first_function(input) + second_function(input)
spark.range(1).select(import_read_example("example_string")).show()
You can use the snowpark.connect.udf.imports setting to include other kinds of files as well, such as those with data your code
needs to read. Note that when you do this, your code should only read from the included files; any writes to such files will be lost after
the function’s execution ends.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")
@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
with open(file_name) as f:
return f.read()
spark.range(1).select(import_read_example("data.csv")).show()
Limitações da função lambda¶
Embora Snowpark Connect for Spark ofereça suporte a expressões lambda e funções de ordem superior (como a função transform), não oferece suporte à referência a colunas ou expressões externas de dentro do corpo lambda.
Essa limitação é causada por restrições às expressões lambda no Snowflake.
data = [
(1, [1, 2, 3]),
(2, [4, 5]),
(3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])
df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context
df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
Outra limitação é que as funções definidas pelo usuário (UDFs) não são compatíveis com expressões lambda. Isso inclui UDFs personalizadas e certas funções internas cuja implementação subjacente depende de UDFs do Snowflake. Tentar usar uma UDF dentro de uma expressão lambda resultará em um erro.
spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Temporary views¶
By default, Snowpark Connect for Spark does not create a temporary view in Snowflake. You can specify that Snowpark Connect for Spark creates a temporary view by
setting the configuration parameter snowpark.connect.temporary.views.create_in_snowflake to true.
If the parameter is set to false, Snowpark Connect for Spark stores views as DataFrames without creating a Snowflake view. This helps to prevent
the issue that can occur when the view definition SQL created from Spark Connect request exceeds Snowflake view size limit (95KB).
Temporary views are normally visible when using Spark Connect Catalog API. However, they are not accessible when called from SQL statements
with configuration snowpark.connect.sql.passthrough set to true. To create Snowflake temporary views, set configuration
snowpark.connect.temporary.views.create_in_snowflake to true.
Fontes de dados¶
Fonte de dados |
Problemas de compatibilidade em comparação com PySpark |
|---|---|
Avro |
O tipo de arquivo não é suportado. |
CSV |
O registro em log não é suportado para o seguinte: As seguintes ações não têm suporte: |
JSON |
O registro em log não é suportado para o seguinte: As seguintes opções não têm suporte: Diferença em |
Orc |
O tipo de arquivo não é suportado. |
Parquet |
O registro em log não é suportado para o seguinte: As seguintes opções não têm suporte: Configuração sem suporte: (ALL) |
Texto |
O registro em log não é suportado para o seguinte: As seguintes opções não têm suporte: O parâmetro |
XML |
O tipo de arquivo não é suportado. |
Tabela do Snowflake |
A gravação na tabela não precisa de um formato de provedor. Colocação em bucket e particionamento não são suportados. O formato de armazenamento e o controle de versão não são suportados. |
Catálogo¶
Suporte ao provedor do Catálogo Snowflake Horizon¶
Somente o Snowflake é compatível como provedor de catálogo.
APIs de catálogo sem suporte¶
registerFunctionlistFunctionsgetFunctionfunctionExistscreateExternalTable
APIs de catálogo parcialmente suportadas¶
createTable(sem suporte para tabela externa)
Iceberg¶
Tabela Iceberg gerenciada pelo Snowflake¶
O Snowpark Connect para Spark funciona com tabelas Apache Iceberg™, incluindo tabelas Iceberg gerenciadas externamente e bancos de dados vinculados a catálogo.
Leitura¶
Time Travel não é suportado, incluindo instantâneo histórico, ramificação e leitura incremental.
Gravação¶
O uso do Spark SQL para criar tabelas não é suportado.
A mesclagem de esquemas não é compatível.
Para criar a tabela, você deve:
Criar um volume externo.
Vincular as necessidades de volume externo à criação da tabela de uma das seguintes maneiras:
Definir o EXTERNAL_VOLUME para o banco de dados.
Definir
snowpark.connect.iceberg.external_volumepara a configuração do Spark.
Tabela Iceberg gerenciada externamente¶
Leitura¶
Você deve criar uma entidade de tabela não gerenciada do Snowflake.
Time Travel não é suportado, incluindo instantâneo histórico, ramificação e leitura incremental.
Gravação¶
Não há suporte para a criação de tabelas.
Há suporte para a gravação na tabela Iceberg existente.
Duplicação de nomes de colunas¶
O Snowflake não oferece suporte a nomes de colunas duplicados.
O código a seguir falha na etapa de criação da exibição com o seguinte erro de compilação SQL: duplicate column name 'foo'.
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Para contornar isso, defina a opção de configuração snowpark.connect.views.duplicate_column_names_handling_mode com um dos seguintes valores:
rename: Um sufixo como_dedup_1,_dedup_2, e assim por diante, será anexado a todos os nomes de colunas duplicadas após o primeiro.drop: Todas as colunas duplicadas, exceto uma, serão descartadas. Isso pode levar a resultados incorretos se as colunas tiverem valores diferentes.