Guia de compatibilidade do Snowpark Connect for Spark¶
Este guia documenta a compatibilidade entre a implementação do Snowpark Connect for Spark de Spark DataFrame APIs e o Apache Spark nativo. O objetivo é ajudar os usuários a entender as principais diferenças, os recursos não suportados e as considerações de migração ao mover cargas de trabalho do Spark para. Snowpark Connect for Spark.
Snowpark Connect for Spark visa fornecer uma experiência familiar do Spark DataFrame API sobre o mecanismo de execução do Snowflake. Entretanto, há lacunas de compatibilidade descritas neste tópico. Este guia destaca essas diferenças para ajudar você a planejar e adaptar sua migração. Esses problemas podem ser abordados em um lançamento futuro.
DataTypes¶
Tipos de dados sem suporte¶
Conversão implícita do tipo de dados¶
Ao usar Snowpark Connect for Spark, tenha em mente como os tipos de dados são tratados. Snowpark Connect for Spark representa implicitamente ByteType, ShortType`e :code:`IntegerType como LongType. Isso significa que, embora você possa definir colunas ou dados com ByteType, ShortType`ou :code:`IntegerType, os dados serão representados e retornados por Snowpark Connect for Spark como LongType. Da mesma forma, a conversão implícita também pode ocorrer para FloatType e DoubleType dependendo das operações e do contexto específicos. O mecanismo de execução do Snowflake manipulará internamente a compactação do tipo de dados e poderá, de fato, armazenar os dados como Byte ou Short, mas esses são considerados detalhes de implementação e não são expostos ao usuário final.
Semanticamente, esta representação não afetará a exatidão de suas consultas do Spark.
Tipo de dados de PySpark nativo |
Tipo de dados de Snowpark Connect for Spark |
|---|---|
|
|
|
|
|
|
|
|
O exemplo a seguir mostra uma diferença entre como o Spark e o Snowpark Connect for Spark manipulam tipos de dados em resultados de consulta.
Consulta¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
Nuance de NullType¶
Snowpark Connect for Spark não oferece suporte ao tipo de dados NullType, que é um tipo de dados compatível no Spark. Isso causa mudanças de comportamento ao usar Null ou None em dataframes.
No Spark, um NULL literal (por exemplo, com lit(None)) é automaticamente inferido como um NullType. Em Snowpark Connect for Spark, ele é inferido como um StringType durante a inferência do esquema.
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
Tipos de dados estruturados em ArrayType, MapType e ObjectType¶
Embora o suporte a tipos estruturados não esteja disponível por padrão em Snowpark Connect for Spark, os tipos de dados ARRAY, MAP e Object são tratados como coleções genéricas e sem tipo. Isso significa que não há aplicação de tipos de elementos, nomes de campos, esquema ou nulidade, ao contrário do que seria fornecido pelo suporte a tipos estruturados.
Se você tem uma dependência desse suporte, trabalhe com sua equipe de conta para habilitar esse recurso para sua conta.
Spark APIs sem suporte¶
A seguir estão as APIs suportadas pelo Spark clássico e Spark Connect, mas não suportadas no Snowpark Connect for Spark.
Dataframe.hint: Snowpark Connect for Spark ignora qualquer dica definida em um dataframe. O otimizador de consultas do Snowflake determina automaticamente a estratégia de execução mais eficiente.
DataFrame.repartition: Esta é uma operação não permitida em Snowpark Connect for Spark. O Snowflake gerencia automaticamente a distribuição e o particionamento de dados em sua infraestrutura de computação distribuída.
pyspark.RDD: RDD APIs não são compatíveis com o Spark Connect (incluindo Snowpark Connect for Spark).
Diferenças de UDF¶
Diferenças de StructType¶
Quando o Spark converte um StructType a ser usado em uma função definida pelo usuário (UDF), ele o converte em um tipo tuple em Python. Snowpark Connect for Spark converterá um StructType em um tipo dict em Python. Isso tem diferenças fundamentais no acesso e na saída do elemento.
O Spark acessará índices com 0, 1, 2, 3 e assim por diante.
Snowpark Connect for Spark acessará índices usando “_1”, “_2” e assim por diante.
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
Tipo de iterador em UDFs¶
O iterador não é compatível como tipo de retorno ou como tipo de entrada.
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Importação de arquivos para UDF Python¶
Com o Snowpark Connect for Spark, você pode especificar bibliotecas e arquivos externos em UDFs Python. O Snowflake inclui arquivos Python e arquivamentos no contexto de execução do seu código. Você pode importar funções desses arquivos incluídos em uma UDF sem etapas adicionais. Esse comportamento de tratamento de dependência funciona conforme descrito em Como criar uma UDF Python com código carregado de um estágio.
Para incluir bibliotecas e arquivos externos, você fornece caminhos de área de preparação para os arquivos como valor da definição de configuração snowpark.connect.udf.imports. O valor da configuração deve ser uma matriz de caminhos de área de preparação para os arquivos, com os caminhos separados por vírgulas.
O código no exemplo a seguir inclui dois arquivos no contexto de execução da UDF. A UDF importa funções desses arquivos e as utiliza na lógica.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")
@udf(returnType = StringType())
def import_example(input: str) -> str:
from library import first_function
from other_lib.custom import second_function
return first_function(input) + second_function(input)
spark.range(1).select(import_read_example("example_string")).show()
Você pode usar a configuração snowpark.connect.udf.imports para incluir também outros tipos de arquivos, como aqueles com dados que seu código precisa ler. Quando você faz isso, observe que seu código deve ler apenas os arquivos incluídos, qualquer gravação nesses arquivos será perdida após o término da execução da função.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")
@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
with open(file_name) as f:
return f.read()
spark.range(1).select(import_read_example("data.csv")).show()
Limitações da função lambda¶
User-defined functions (UDFs) are not supported within lambda expressions. This includes both custom UDFs and certain built-in functions whose underlying implementation relies on Snowflake UDFs. Attempting to use a UDF inside a lambda expression will result in an error.
df = spark.createDataFrame([({"a": 123},)], ("data",))
df.select(map_filter("data", lambda _, v: bit_count(v) > 3)).show() # does not work, since `bit_count` is implemented with UDF
Exibições temporárias¶
Por padrão, o Snowpark Connect for Spark não cria uma exibição temporária no Snowflake. Você pode especificar que o Snowpark Connect for Spark crie uma exibição temporária definindo o parâmetro de configuração snowpark.connect.temporary.views.create_in_snowflake como true.
Se o parâmetro for definido como false, o Snowpark Connect for Spark armazenará as exibições como DataFrames sem criar uma exibição do Snowflake. Isso ajuda a evitar o problema que pode ocorrer quando o SQL da definição da exibição criado com base na solicitação do Spark Connect excede o limite de tamanho de exibição do Snowflake (95KB).
Normalmente, as exibições temporárias estão visíveis ao usar a Spark Connect Catalog API. Entretanto, elas não são acessíveis quando chamadas de instruções SQL com a configuração snowpark.connect.sql.passthrough definir como true. Para criar exibições temporárias no Snowflake, defina a configuração snowpark.connect.temporary.views.create_in_snowflake como true.
Fontes de dados¶
Fonte de dados |
Problemas de compatibilidade em comparação com PySpark |
|---|---|
Avro |
O tipo de arquivo não é suportado. |
CSV |
O registro em log não é suportado para o seguinte: As seguintes ações não têm suporte: |
JSON |
O registro em log não é suportado para o seguinte: As seguintes opções não têm suporte: Diferença em |
Orc |
O tipo de arquivo não é suportado. |
Parquet |
O registro em log não é suportado para o seguinte: As seguintes opções não têm suporte: Configuração sem suporte: (ALL) |
Texto |
O registro em log não é suportado para o seguinte: As seguintes opções não têm suporte: O parâmetro |
XML |
O tipo de arquivo não é suportado. |
Tabela do Snowflake |
A gravação na tabela não precisa de um formato de provedor. Colocação em bucket e particionamento não são suportados. O formato de armazenamento e o controle de versão não são suportados. |
Catálogo¶
Suporte ao provedor do Catálogo Snowflake Horizon¶
Somente o Snowflake é compatível como provedor de catálogo.
APIs de catálogo sem suporte¶
registerFunctionlistFunctionsgetFunctionfunctionExistscreateExternalTable
APIs de catálogo parcialmente suportadas¶
createTable(sem suporte para tabela externa)
Iceberg¶
Tabela Iceberg gerenciada pelo Snowflake¶
O Snowpark Connect para Spark funciona com tabelas Apache Iceberg™, incluindo tabelas Iceberg gerenciadas externamente e bancos de dados vinculados a catálogo.
Leitura¶
Time Travel não é suportado, incluindo instantâneo histórico, ramificação e leitura incremental.
Gravação¶
O uso do Spark SQL para criar tabelas não é suportado.
A mesclagem de esquemas não é compatível.
Para criar a tabela, você deve:
Criar um volume externo.
Vincular as necessidades de volume externo à criação da tabela de uma das seguintes maneiras:
Definir o EXTERNAL_VOLUME para o banco de dados.
Definir
snowpark.connect.iceberg.external_volumepara a configuração do Spark.
Tabela Iceberg gerenciada externamente¶
Leitura¶
Você deve criar uma entidade de tabela não gerenciada do Snowflake.
Time Travel não é suportado, incluindo instantâneo histórico, ramificação e leitura incremental.
Gravação¶
Não há suporte para a criação de tabelas.
Há suporte para a gravação na tabela Iceberg existente.
Duplicação de nomes de colunas¶
O Snowflake não oferece suporte a nomes de colunas duplicados.
O código a seguir falha na etapa de criação da exibição com o seguinte erro de compilação SQL: duplicate column name 'foo'.
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Para contornar isso, defina a opção de configuração snowpark.connect.views.duplicate_column_names_handling_mode com um dos seguintes valores:
rename: Um sufixo como_dedup_1,_dedup_2, e assim por diante, será anexado a todos os nomes de colunas duplicadas após o primeiro.drop: Todas as colunas duplicadas, exceto uma, serão descartadas. Isso pode levar a resultados incorretos se as colunas tiverem valores diferentes.