Guia de compatibilidade do Snowpark Connect for Spark¶
Este guia documenta a compatibilidade entre a implementação do Snowpark Connect for Spark de Spark DataFrame APIs e o Apache Spark nativo. O objetivo é ajudar os usuários a entender as principais diferenças, os recursos não suportados e as considerações de migração ao mover cargas de trabalho do Spark para. Snowpark Connect for Spark.
Snowpark Connect for Spark visa fornecer uma experiência familiar do Spark DataFrame API sobre o mecanismo de execução do Snowflake. Entretanto, há lacunas de compatibilidade descritas neste tópico. Este guia destaca essas diferenças para ajudar você a planejar e adaptar sua migração. Esses problemas podem ser abordados em um lançamento futuro.
DataTypes¶
Tipos de dados sem suporte¶
Conversão implícita do tipo de dados¶
Ao usar Snowpark Connect for Spark, tenha em mente como os tipos de dados são tratados. Snowpark Connect for Spark representa implicitamente ByteType
, ShortType`e :code:`IntegerType
como LongType
. Isso significa que, embora você possa definir colunas ou dados com ByteType
, ShortType`ou :code:`IntegerType
, os dados serão representados e retornados por Snowpark Connect for Spark como LongType
. Da mesma forma, a conversão implícita também pode ocorrer para FloatType
e DoubleType
dependendo das operações e do contexto específicos. O mecanismo de execução do Snowflake manipulará internamente a compactação do tipo de dados e poderá, de fato, armazenar os dados como Byte
ou Short
, mas esses são considerados detalhes de implementação e não são expostos ao usuário final.
Semanticamente, esta representação não afetará a exatidão de suas consultas do Spark.
Tipo de dados de PySpark nativo |
Tipo de dados de Snowpark Connect for Spark |
---|---|
|
|
|
|
|
|
|
|
O exemplo a seguir mostra uma diferença entre como o Spark e o Snowpark Connect for Spark manipulam tipos de dados em resultados de consulta.
Consulta¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
Nuance de NullType
¶
Snowpark Connect for Spark não oferece suporte ao tipo de dados NullType, que é um tipo de dados compatível no Spark. Isso causa mudanças de comportamento ao usar Null
ou None
em dataframes.
No Spark, um NULL
literal (por exemplo, com lit(None)
) é automaticamente inferido como um NullType
. Em Snowpark Connect for Spark, ele é inferido como um StringType
durante a inferência do esquema.
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
Tipos de dados estruturados em ArrayType
, MapType
e ObjectType
¶
Embora o suporte a tipos estruturados não esteja disponível por padrão em Snowpark Connect for Spark, os tipos de dados ARRAY
, MAP
e Object
são tratados como coleções genéricas e sem tipo. Isso significa que não há aplicação de tipos de elementos, nomes de campos, esquema ou nulidade, ao contrário do que seria fornecido pelo suporte a tipos estruturados.
Se você tem uma dependência desse suporte, trabalhe com sua equipe de conta para habilitar esse recurso para sua conta.
Spark APIs sem suporte¶
A seguir estão as APIs suportadas pelo Spark clássico e Spark Connect, mas não suportadas no Snowpark Connect for Spark.
Dataframe.hint: Snowpark Connect for Spark ignora qualquer dica definida em um dataframe. O otimizador de consultas do Snowflake determina automaticamente a estratégia de execução mais eficiente.
DataFrame.repartition: Esta é uma operação não permitida em Snowpark Connect for Spark. O Snowflake gerencia automaticamente a distribuição e o particionamento de dados em sua infraestrutura de computação distribuída.
pyspark.RDD: RDD APIs não são compatíveis com o Spark Connect (incluindo Snowpark Connect for Spark).
Diferenças de UDF¶
Diferenças de StructType
¶
Quando o Spark converte um StructType
a ser usado em uma função definida pelo usuário (UDF), ele o converte em um tipo tuple
em Python. Snowpark Connect for Spark converterá um StructType
em um tipo dict
em Python. Isso tem diferenças fundamentais no acesso e na saída do elemento.
O Spark acessará índices com 0, 1, 2, 3 e assim por diante.
Snowpark Connect for Spark acessará índices usando “_1”, “_2” e assim por diante.
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# |spconnect|: {'col1': 1, 'col2': 2}
Tipo de iterador em UDFs¶
O iterador não é compatível como tipo de retorno ou como tipo de entrada.
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Limitações da função lambda¶
Embora Snowpark Connect for Spark ofereça suporte a expressões lambda e funções de ordem superior (como a função transform
), não oferece suporte à referência a colunas ou expressões externas de dentro do corpo lambda.
Essa limitação é causada por restrições às expressões lambda no Snowflake.
data = [
(1, [1, 2, 3]),
(2, [4, 5]),
(3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])
df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context
df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
Outra limitação é que as funções definidas pelo usuário (UDFs) não são compatíveis com expressões lambda. Isso inclui UDFs personalizadas e certas funções internas cuja implementação subjacente depende de UDFs do Snowflake. Tentar usar uma UDF dentro de uma expressão lambda resultará em um erro.
spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Fontes de dados¶
Fonte de dados |
Problemas de compatibilidade em comparação com PySpark |
---|---|
Avro |
O tipo de arquivo não é suportado. |
CSV |
O registro em log não é suportado para o seguinte: As seguintes ações não têm suporte: |
JSON |
O registro em log não é suportado para o seguinte: As seguintes opções não têm suporte: Diferença em |
Orc |
O tipo de arquivo não é suportado. |
Parquet |
O registro em log não é suportado para o seguinte: As seguintes opções não têm suporte: Configuração sem suporte: (ALL) |
Texto |
O registro em log não é suportado para o seguinte: As seguintes opções não têm suporte: O parâmetro |
XML |
O tipo de arquivo não é suportado. |
Tabela do Snowflake |
A gravação na tabela não precisa de um formato de provedor. Colocação em bucket e particionamento não são suportados. O formato de armazenamento e o controle de versão não são suportados. |
Catálogo¶
Suporte ao provedor do Catálogo Snowflake Horizon¶
Somente o Snowflake é compatível como provedor de catálogo.
APIs de catálogo sem suporte¶
registerFunction
listFunctions
getFunction
functionExists
createExternalTable
APIs de catálogo parcialmente suportadas¶
createTable
(sem suporte para tabela externa)
Iceberg¶
Tabela Iceberg gerenciada pelo Snowflake¶
O Snowpark Connect para Spark funciona com tabelas Apache Iceberg™, incluindo tabelas Iceberg gerenciadas externamente e bancos de dados vinculados a catálogo.
Leitura¶
Time Travel não é suportado, incluindo instantâneo histórico, ramificação e leitura incremental.
Gravação¶
O uso do Spark SQL para criar tabelas não é suportado.
A mesclagem de esquemas não é compatível.
Para criar a tabela, você deve:
Criar um volume externo.
Vincular as necessidades de volume externo à criação da tabela de uma das seguintes maneiras:
Definir o EXTERNAL_VOLUME para o banco de dados.
Definir
snowpark.connect.iceberg.external_volume
para a configuração do Spark.
Tabela Iceberg gerenciada externamente¶
Leitura¶
Você deve criar uma entidade de tabela não gerenciada do Snowflake.
Time Travel não é suportado, incluindo instantâneo histórico, ramificação e leitura incremental.
Gravação¶
Não há suporte para a criação de tabelas.
Há suporte para a gravação na tabela Iceberg existente.
Duplicação de nomes de colunas¶
O Snowflake não oferece suporte a nomes de colunas duplicados.
O código a seguir falha na etapa de criação da exibição com o seguinte erro de compilação SQL: duplicate column name 'foo'
.
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Para contornar isso, defina a opção de configuração snowpark.connect.views.duplicate_column_names_handling_mode
com um dos seguintes valores:
rename
: Um sufixo como_dedup_1
,_dedup_2
, e assim por diante, será anexado a todos os nomes de colunas duplicadas após o primeiro.drop
: Todas as colunas duplicadas, exceto uma, serão descartadas. Isso pode levar a resultados incorretos se as colunas tiverem valores diferentes.