Guia de compatibilidade do Snowpark Connect for Spark

Este guia documenta a compatibilidade entre a implementação do Snowpark Connect for Spark de Spark DataFrame APIs e o Apache Spark nativo. O objetivo é ajudar os usuários a entender as principais diferenças, os recursos não suportados e as considerações de migração ao mover cargas de trabalho do Spark para. Snowpark Connect for Spark.

Snowpark Connect for Spark visa fornecer uma experiência familiar do Spark DataFrame API sobre o mecanismo de execução do Snowflake. Entretanto, há lacunas de compatibilidade descritas neste tópico. Este guia destaca essas diferenças para ajudar você a planejar e adaptar sua migração. Esses problemas podem ser abordados em um lançamento futuro.

DataTypes

Tipos de dados sem suporte

Conversão implícita do tipo de dados

Ao usar Snowpark Connect for Spark, tenha em mente como os tipos de dados são tratados. Snowpark Connect for Spark representa implicitamente ByteType, ShortType`e :code:`IntegerType como LongType. Isso significa que, embora você possa definir colunas ou dados com ByteType, ShortType`ou :code:`IntegerType, os dados serão representados e retornados por Snowpark Connect for Spark como LongType. Da mesma forma, a conversão implícita também pode ocorrer para FloatType e DoubleType dependendo das operações e do contexto específicos. O mecanismo de execução do Snowflake manipulará internamente a compactação do tipo de dados e poderá, de fato, armazenar os dados como Byte ou Short, mas esses são considerados detalhes de implementação e não são expostos ao usuário final.

Semanticamente, esta representação não afetará a exatidão de suas consultas do Spark.

Tipo de dados de PySpark nativo

Tipo de dados de Snowpark Connect for Spark

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

O exemplo a seguir mostra uma diferença entre como o Spark e o Snowpark Connect for Spark manipulam tipos de dados em resultados de consulta.

Consulta

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

Nuance de NullType

Snowpark Connect for Spark não oferece suporte ao tipo de dados NullType, que é um tipo de dados compatível no Spark. Isso causa mudanças de comportamento ao usar Null ou None em dataframes.

No Spark, um NULL literal (por exemplo, com lit(None)) é automaticamente inferido como um NullType. Em Snowpark Connect for Spark, ele é inferido como um StringType durante a inferência do esquema.

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
Copy

Tipos de dados estruturados em ArrayType, MapType e ObjectType

Embora o suporte a tipos estruturados não esteja disponível por padrão em Snowpark Connect for Spark, os tipos de dados ARRAY, MAP e Object são tratados como coleções genéricas e sem tipo. Isso significa que não há aplicação de tipos de elementos, nomes de campos, esquema ou nulidade, ao contrário do que seria fornecido pelo suporte a tipos estruturados.

Se você tem uma dependência desse suporte, trabalhe com sua equipe de conta para habilitar esse recurso para sua conta.

Spark APIs sem suporte

A seguir estão as APIs suportadas pelo Spark clássico e Spark Connect, mas não suportadas no Snowpark Connect for Spark.

  • Dataframe.hint: Snowpark Connect for Spark ignora qualquer dica definida em um dataframe. O otimizador de consultas do Snowflake determina automaticamente a estratégia de execução mais eficiente.

  • DataFrame.repartition: Esta é uma operação não permitida em Snowpark Connect for Spark. O Snowflake gerencia automaticamente a distribuição e o particionamento de dados em sua infraestrutura de computação distribuída.

  • pyspark.RDD: RDD APIs não são compatíveis com o Spark Connect (incluindo Snowpark Connect for Spark).

  • pyspark.ml

  • pyspark streaming

Diferenças de UDF

Diferenças de StructType

Quando o Spark converte um StructType a ser usado em uma função definida pelo usuário (UDF), ele o converte em um tipo tuple em Python. Snowpark Connect for Spark converterá um StructType em um tipo dict em Python. Isso tem diferenças fundamentais no acesso e na saída do elemento.

  • O Spark acessará índices com 0, 1, 2, 3 e assim por diante.

  • Snowpark Connect for Spark acessará índices usando “_1”, “_2” e assim por diante.

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# |spconnect|: {'col1': 1, 'col2': 2}
Copy

Tipo de iterador em UDFs

O iterador não é compatível como tipo de retorno ou como tipo de entrada.

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Limitações da função lambda

Embora Snowpark Connect for Spark ofereça suporte a expressões lambda e funções de ordem superior (como a função transform), não oferece suporte à referência a colunas ou expressões externas de dentro do corpo lambda.

Essa limitação é causada por restrições às expressões lambda no Snowflake.

data = [
    (1, [1, 2, 3]),
    (2, [4, 5]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])

df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context

df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
Copy

Outra limitação é que as funções definidas pelo usuário (UDFs) não são compatíveis com expressões lambda. Isso inclui UDFs personalizadas e certas funções internas cuja implementação subjacente depende de UDFs do Snowflake. Tentar usar uma UDF dentro de uma expressão lambda resultará em um erro.

spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Copy

Fontes de dados

Fonte de dados

Problemas de compatibilidade em comparação com PySpark

Avro

O tipo de arquivo não é suportado.

CSV

O registro em log não é suportado para o seguinte: Append, Ignore.

As seguintes ações não têm suporte: encoding, quote, quoteAll, escape, escapeQuotes, comment, preferDate, enforceSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nanValue, positiveInf, negativeInf, timestampNTZFormat, enableDateTimeParsingFallback, maxColumns, maxCharsPerColumn, mode, columnNameOfCorruptRecord, charToEscapeQuoteEscaping, samplingRatio, emptyValue, locale, lineSep, unescapedQuoteHandling, compression.

JSON

O registro em log não é suportado para o seguinte: Append, Ignore.

As seguintes opções não têm suporte: timeZone, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZeros, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, timestampNTZFormat, enableDateTimeParsingFallback, allowUnquotedControlChars, encoding, lineSep, samplingRatio, dropFieldIfAllNull, locale, allowNonNumericNumbers, compression, ignoreNullFields.

Diferença em Show: Se o valor do campo fosse uma cadeia de caracteres, ela seria colocada entre aspas. Um caractere extra «n» seria mostrado no resultado.

Orc

O tipo de arquivo não é suportado.

Parquet

O registro em log não é suportado para o seguinte: Append, Ignore.

As seguintes opções não têm suporte: datetimeRebaseMode, int96RebaseMode, mergeSchema, compression.

Configuração sem suporte: (ALL)

Texto

O registro em log não é suportado para o seguinte: Append, Ignore.

As seguintes opções não têm suporte: compression.

O parâmetro lineSep não é compatível com a gravação.

XML

O tipo de arquivo não é suportado.

Tabela do Snowflake

A gravação na tabela não precisa de um formato de provedor.

Colocação em bucket e particionamento não são suportados.

O formato de armazenamento e o controle de versão não são suportados.

Catálogo

Suporte ao provedor do Catálogo Snowflake Horizon

  • Somente o Snowflake é compatível como provedor de catálogo.

APIs de catálogo sem suporte

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

APIs de catálogo parcialmente suportadas

  • createTable (sem suporte para tabela externa)

Iceberg

Tabela Iceberg gerenciada pelo Snowflake

O Snowpark Connect para Spark funciona com tabelas Apache Iceberg™, incluindo tabelas Iceberg gerenciadas externamente e bancos de dados vinculados a catálogo.

Leitura

Time Travel não é suportado, incluindo instantâneo histórico, ramificação e leitura incremental.

Gravação

  • O uso do Spark SQL para criar tabelas não é suportado.

  • A mesclagem de esquemas não é compatível.

  • Para criar a tabela, você deve:

    • Criar um volume externo.

    • Vincular as necessidades de volume externo à criação da tabela de uma das seguintes maneiras:

      • Definir o EXTERNAL_VOLUME para o banco de dados.

      • Definir snowpark.connect.iceberg.external_volume para a configuração do Spark.

Tabela Iceberg gerenciada externamente

Leitura

  • Você deve criar uma entidade de tabela não gerenciada do Snowflake.

  • Time Travel não é suportado, incluindo instantâneo histórico, ramificação e leitura incremental.

Gravação

  • Não há suporte para a criação de tabelas.

  • Há suporte para a gravação na tabela Iceberg existente.

Duplicação de nomes de colunas

O Snowflake não oferece suporte a nomes de colunas duplicados.

O código a seguir falha na etapa de criação da exibição com o seguinte erro de compilação SQL: duplicate column name 'foo'.

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

Para contornar isso, defina a opção de configuração snowpark.connect.views.duplicate_column_names_handling_mode com um dos seguintes valores:

  • rename: Um sufixo como _dedup_1, _dedup_2, e assim por diante, será anexado a todos os nomes de colunas duplicadas após o primeiro.

  • drop: Todas as colunas duplicadas, exceto uma, serão descartadas. Isso pode levar a resultados incorretos se as colunas tiverem valores diferentes.