Propriedades do Snowpark Connect for Spark

O Snowpark Connect for Spark oferece suporte à configuração personalizada de forma semelhante ao Spark padrão. Você pode modificar as propriedades de configuração somente por meio do método set da sessão usando um par chave-valor. Observe que o Snowpark Connect for Spark reconhece apenas um conjunto limitado de propriedades que influenciam a execução. Todas as propriedades incompatíveis são silenciosamente ignoradas sem gerar uma exceção.

Propriedades do Spark compatíveis

O Snowpark Connect for Spark oferece suporte a um subconjunto de propriedades do Spark.

Nome da propriedade

Padrão

Significado

Desde

spark.app.name

(nenhum)

Nome do aplicativo definido como query_tag do Snowflake (Spark-Connect-App-Name={name}) para rastreamento de consultas.

1.0.0

spark.Catalog.databaseFilterInformationSchema

false

Quando true, filtra INFORMATION_SCHEMA das listagens de banco de dados nas operações de catálogo.

1.0.0

spark.hadoop.fs.s3a.access.key

(nenhum)

ID da chave de acesso da AWS para autenticação no S3 ao ler ou gravar em locais do S3.

1.0.0

spark.hadoop.fs.s3a.assumed.role.arn

(nenhum)

ARN da função do IAM da AWS com acesso ao S3 ao usar autenticação baseada em função.

1.0.0

spark.hadoop.fs.s3a.secret.key

(nenhum)

Chave de acesso secreta da AWS para autenticação no S3 ao ler ou gravar em locais do S3.

1.0.0

spark.hadoop.fs.s3a.server-side-encryption.key

(nenhum)

ID da chave KMS da AWS para criptografia do lado do servidor ao usar o tipo de criptografia AWS_SSE_KMS.

1.0.0

spark.hadoop.fs.s3a.session.token

(nenhum)

Token de sessão da AWS para credenciais temporárias do S3 ao usar STS.

1.0.0

spark.sql.ansi.enabled

false

Habilita o modo ANSI SQL para verificação de tipo e tratamento de erros mais rigorosos. Quando true, estouros aritméticos e conversões inválidas geram erros em vez de retornar NULL.

1.0.0

spark.sql.caseSensitive

false

Controla a diferenciação de letras maiúsculas e minúsculas para identificadores. Quando false, os nomes de colunas e tabelas não diferenciam maiúsculas de minúsculas (colocados automaticamente em maiúsculas no Snowflake).

1.0.0

spark.sql.crossJoin.enabled

true

Habilita ou desabilita junções cruzadas implícitas. Um false ou uma condição de junção ausente ou trivial resultará em erro.

1.0.0

spark.sql.execution.pythonUDTF.arrow.enabled

false

Quando true, habilita a otimização do Apache Arrow para serialização/desserialização de UDTF Python.

1.0.0

spark.sql.globalTempDatabase

global_temp

Nome do esquema para exibições temporárias globais. Criado automaticamente se não existir.

1.0.0

spark.sql.legacy.allowHashOnMapType

false

Quando true, permite hashing de colunas do tipo MAP. Por padrão, os tipos MAP não podem passar por hashing para consistência com o comportamento do Spark.

1.0.0

spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue

false

Comportamento legado para nomeação de chaves de agrupamento de conjuntos de dados.

1.6.0

spark.sql.mapKeyDedupPolicy

EXCEPTION

Controla o comportamento quando chaves duplicadas são encontradas na criação do mapa. Valores: EXCEPTION (gerar erro) ou LAST_WIN (manter o último valor).

1.0.0

spark.sql.parser.quotedRegexColumnNames

false

Quando true, habilita a correspondência de padrão regex em nomes de colunas entre aspas nas consultas SQL (por exemplo, SELECT '(col1|col2)' FROM table).

1.0.0

spark.sql.parquet.outputTimestampType

TIMESTAMP_MILLIS

Controla o tipo de carimbo de data/hora da saída do Parquet. Oferece suporte a TIMESTAMP_MILLIS ou TIMESTAMP_MICROS.

1.7.0

spark.sql.pyspark.inferNestedDictAsStruct.enabled

false

Quando true, infere dicionários Python aninhados como StructType em vez de MapType durante a criação de DataFrame.

1.0.0

spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled

false

Quando true, infere o tipo de elemento da matriz apenas a partir do primeiro elemento, em vez de fazer amostragem de todos os elementos.

1.0.0

spark.sql.repl.eagerEval.enabled

false

Quando true, habilita a avaliação adiantada em REPL mostrando os resultados de DataFrame automaticamente sem chamar show().

1.0.0

spark.sql.repl.eagerEval.maxNumRows

20

Número máximo de linhas a serem exibidas no modo de avaliação adiantada REPL.

1.0.0

spark.sql.repl.eagerEval.truncate

20

Largura máxima para valores de coluna na exibição de avaliação adiantada REPL antes do truncamento.

1.0.0

spark.sql.session.localRelationCacheThreshold

2147483647

Limite de bytes para armazenar em cache relações locais. Relações maiores que esse valor são armazenadas em cache para melhorar o desempenho.

1.0.0

spark.sql.session.timeZone

<system_local_timezone>

Fuso horário da sessão usado para operações de carimbo de data/hora. Sincronizado com a sessão do Snowflake via ALTER SESSION SET TIMEZONE.

1.0.0

spark.sql.sources.default

parquet

Formato padrão da fonte de dados para operações de leitura/gravação quando o formato não é especificado explicitamente.

1.0.0

spark.sql.timestampType

TIMESTAMP_LTZ

Tipo de carimbo de data/hora padrão para operações de carimbo de data/hora. Valores: TIMESTAMP_LTZ (com fuso horário local) ou TIMESTAMP_NTZ (sem fuso horário).

1.0.0

spark.sql.tvf.allowMultipleTableArguments.enabled

true

Quando true, permite que funções com valor de tabela aceitem vários argumentos de tabela.

1.0.0

Propriedades do Snowpark Connect for Spark compatíveis

Propriedades de configuração personalizada específicas do Snowpark Connect for Spark.

Nome da propriedade

Padrão

Significado

Desde

fs.azure.sas.<container>.<account>.blob.core.windows.net

(nenhum)

Token do SAS do Azure para autenticação do Armazenamento de Blobs. Usado ao ler ou gravar em locais do Armazenamento de Blobs do Azure.

1.0.0

fs.azure.sas.fixed.token.<account>.dfs.core.windows.net

(nenhum)

Token do SAS do Azure para autenticação do ADLS Gen2 (Data Lake Storage). Usado ao ler ou gravar em locais do Azure Data Lake Storage Gen2.

1.0.0

mapreduce.fileoutputcommitter.marksuccessfuljobs

false

Quando true, gera um arquivo _SUCCESS após operações de gravação bem-sucedidas para compatibilidade com fluxos de trabalho do Hadoop/Spark.

1.0.0

parquet.enable.summary-metadata

false

Configuração alternativa para gerar arquivos de metadados de resumo do Parquet. Esta configuração ou spark.sql.parquet.enable.summary-metadata habilita o recurso.

1.4.0

snowflake.repartition.for.writes

false

Quando true, força DataFrame.repartition(n) a dividir a saída em arquivos n durante as gravações. Corresponde ao comportamento do Spark, mas adiciona sobrecarga.

1.0.0

snowpark.connect.cte.optimization_enabled

false

Quando true, habilita a otimização da Expressão de Tabela Comum (Common Table Expression, CTE) nas sessões do Snowpark para melhorar o desempenho das consultas.

1.0.0

snowpark.connect.describe_cache_ttl_seconds

300

Tempo de vida em segundos para entradas do cache de consulta. Reduz pesquisas repetidas de esquema.

1.0.0

snowpark.connect.enable_snowflake_extension_behavior

false

Quando true, habilita extensões específicas do Snowflake que podem ser diferentes do comportamento do Spark (como hashing em tipos MAP ou tipo de retorno MD5).

1.0.0

snowpark.connect.handleIntegralOverflow

false

Quando true, o comportamento de estouro integral é alinhado à abordagem do Spark.

1.7.0

snowpark.connect.iceberg.external_volume

(nenhum)

Nome do volume externo do Snowflake para operações de tabela Iceberg.

1.0.0

snowpark.connect.integralTypesEmulation

client_default

Controla a conversão de tipos decimais em integrais. Valores: client_default, enabled, disabled

1.7.0

snowpark.connect.scala.version

2.12

Controla a versão do Scala usada (com suporte para 2.12 ou 2.13)

1.7.0

snowpark.connect.sql.partition.external_table_location

(nenhum)

Caminho do local da tabela externa para gravações particionadas.

1.4.0

snowpark.connect.temporary.views.create_in_snowflake

false

Quando true, cria exibições temporárias diretamente no Snowflake em vez de gerenciá-las localmente.

1.0.1

snowpark.connect.udf.imports [DEPRECATED 1.7.0]

(nenhum)

Lista separada por vírgula de arquivos ou módulos a serem importados para execução de UDF. Aciona a recriação de UDF quando alterada.

1.0.0

snowpark.connect.udf.python.imports

(nenhum)

Lista separada por vírgula de arquivos/módulos a serem importados para execução de UDF em Python. Aciona a recriação de UDF quando alterada.

1.7.0

snowpark.connect.udf.java.imports

(nenhum)

Lista separada por vírgula de arquivos ou módulos a serem importados para execução de UDF em Java. Aciona a recriação de UDF quando alterada.

1.7.0

snowpark.connect.udf.packages

(nenhum)

Lista separada por vírgula de pacotes Python a serem incluídos ao registrar UDFs.

1.0.0

snowpark.connect.udtf.compatibility_mode

false

Quando true, habilita o comportamento de UDTF compatível com Spark para melhor compatibilidade com a semântica de UDTF do Spark.

1.0.0

snowpark.connect.version

<current_version>

Somente leitura. Retorna a versão atual do Snowpark Connect for Spark.

1.0.0

snowpark.connect.views.duplicate_column_names_handling_mode

rename

Como tratar nomes de colunas duplicados em exibições. Valores: rename (adicionar sufixo), fail (gerar erro) ou drop (remover duplicatas).

1.0.0

spark.sql.parquet.enable.summary-metadata

false

Quando true, gera arquivos de metadados de resumo do Parquet (_metadata _common_metadata) durante as gravações do Parquet.

1.4.0

fs.azure.sas.<container>.<account>.blob.core.windows.net

Especifica o token do SAS do Azure para autenticação do Armazenamento de Blobs. Usado ao ler ou gravar em locais do Armazenamento de Blobs do Azure.

Padrão: (nenhum)

Desde: 1.0.0

fs.azure.sas.fixed.token.<account>.dfs.core.windows.net

Especifica o token do SAS do Azure para autenticação do ADLS Gen2 (Data Lake Storage). Usado ao ler ou gravar em locais do Azure Data Lake Storage Gen2.

Padrão: (nenhum)

Desde: 1.0.0

mapreduce.fileoutputcommitter.marksuccessfuljobs

Especifica true para gerar um arquivo _SUCCESS após operações de gravação bem-sucedidas para compatibilidade com fluxos de trabalho do Hadoop ou Spark.

Padrão: false

Desde: 1.0.0

parquet.enable.summary-metadata

Especifica a configuração alternativa para gerar arquivos de metadados de resumo do Parquet. Habilita o recurso com esta propriedade ou spark.sql.parquet.enable.summary-metadata.

Padrão: false

Desde: 1.4.0

snowflake.repartition.for.writes

Especifique true para forçar DataFrame.repartition(n) a dividir a saída em arquivos n durante as gravações. Corresponde ao comportamento do Spark, mas adiciona sobrecarga.

Padrão: false

Desde: 1.0.0

snowpark.connect.cte.optimization_enabled

Especifique true para habilitar a otimização da Expressão de Tabela Comum (Common Table Expression, CTE) na sessão do Snowpark para desempenho das consultas.

Padrão: false

Desde: 1.0.0

Comentários

Configuração que habilita as Expressões de Tabela Comuns do Snowflake (CTEs). Essa configuração otimiza as consultas do Snowflake nas quais há muitos blocos de código repetitivos. Essa modificação vai melhorar tanto a compilação de consultas quanto o desempenho da execução.

snowpark.connect.describe_cache_ttl_seconds

Especifica o tempo de vida, em segundos, para entradas do cache de consulta. Reduz pesquisas repetidas de esquema.

Padrão: 300

Desde: 1.0.0

snowpark.connect.enable_snowflake_extension_behavior

Especifique true para habilitar extensões específicas do Snowflake que podem ser diferentes do comportamento do Spark (como hashing em tipo de retorno MD5 para tipos MAP).

Padrão: false

Desde: 1.0.0

Comentários

Quando definida como true, altera o comportamento de determinadas operações:

snowpark.connect.handleIntegralOverflow

Especifique true para alinhar o comportamento de estouro integral à abordagem do Spark.

Padrão: false

Desde: 1.7.0

snowpark.connect.iceberg.external_volume

Especifica o nome do volume externo do Snowflake para operações de tabela Iceberg.

Padrão: (nenhum)

Desde: 1.0.0

snowpark.connect.integralTypesEmulation

Especifica como converter tipos decimais em integrais. Valores: client_default, enabled, disabled

Padrão: client_default

Desde: 1.7.0

Comentários

Por padrão, o Snowpark Connect for Spark trata todos os tipos integrais como Long. Isso é causado pela maneira como os números são representados no Snowflake. A emulação de tipos integrais permite um mapeamento exato entre os tipos do Snowpark e do Spark ao ler fontes de dados.

A opção padrão client_default ativa a emulação somente quando o script é executado do cliente Scala. Os tipos integrais são mapeados com base nas seguintes precisões:

Precisão

Tipo do Spark

19

LongType

10

IntegerType

5

ShortType

3

ByteType

Outros

DecimalType(precision, 0)

Quando outras precisões são encontradas, o tipo final é mapeado para o DecimalType.

snowpark.connect.scala.version

Especifica a versão do Scala a ser usada (compatível com 2.12 ou 2.13).

Padrão: 2.12

Desde: 1.7.0

snowpark.connect.sql.partition.external_table_location

Especifica o caminho do local da tabela externa para gravações particionadas.

Padrão: (nenhum)

Desde: 1.4.0

Comentários

Para ler apenas um subconjunto exato de arquivos particionados do diretório fornecido, é necessária uma configuração adicional. Este recurso só está disponível para arquivos armazenados em áreas de preparação externas. Para remover os arquivos de leitura, o Snowpark Connect for Spark usa tabelas externas.

Este recurso é habilitado quando a configuração snowpark.connect.sql.partition.external_table_location é definida. Ele deve conter os nomes existentes de bancos de dados e esquemas em que as tabelas externas serão criadas.

A leitura de arquivos Parquet armazenados em áreas de preparação externas cria uma tabela externa. Para arquivos em áreas de preparação internas, ela não será criada. O fornecimento do esquema reduz o tempo de execução, o que elimina o custo para inferi-lo com base nas fontes.

Para melhor desempenho, filtre de acordo com as limitações de filtragem de tabelas externas do Snowflake.

Exemplo
spark.conf.set("snowpark.connect.sql.partition.external_table_location", "<database-name>.<schema-name>")

spark.read.parquet("@external-stage/example").filter(col("x") > lit(1)).show()

schema = StructType([StructField("x",IntegerType()),StructField("y",DoubleType())])

spark.read.schema(schema).parquet("@external-stage/example").filter(col("x") > lit(1)).show()
Copy

snowpark.connect.temporary.views.create_in_snowflake

Especifique true para criar exibições temporárias diretamente no Snowflake em vez de gerenciá-las localmente.

Padrão: false

Desde: 1.0.1

snowpark.connect.udf.imports [DEPRECATED 1.7.0]

Especifica uma lista separada por vírgulas de arquivos e módulos a serem importados para execução de UDF. Quando esse valor é alterado, a recriação da UDF é acionada.

Padrão: (nenhum)

Desde: 1.0.0

snowpark.connect.udf.python.imports

Especifica uma lista separada por vírgulas de arquivos e módulos a serem importados para execução de UDF em Python. Quando esse valor é alterado, a recriação da UDF é acionada.

Padrão: (nenhum)

Desde: 1.7.0

snowpark.connect.udf.java.imports

Especifica uma lista separada por vírgulas de arquivos e módulos a serem importados para execução de UDF em Java. Aciona a recriação de UDF quando alterada.

Padrão: (nenhum)

Desde: 1.7.0

Comentários

Essa configuração funciona de forma muito semelhante a snowpark.connect.udf.python.imports. Com ela, você pode especificar bibliotecas e arquivos externos para UDFs Java criadas usando registerJavaFunction. As configurações são mutuamente exclusivas para impedir uma mistura de dependências desnecessária.

Para incluir bibliotecas e arquivos externos, você fornece caminhos de área de preparação para os arquivos como valor da definição de configuração snowpark.connect.udf.java.imports. O valor da configuração deve ser uma matriz de caminhos de área de preparação para os arquivos, com os caminhos separados por vírgulas.

Exemplo

O código no exemplo a seguir inclui dois arquivos no contexto de execução da UDF. A UDF importa funções desses arquivos e as utiliza na lógica.

# Files need to be previously staged

spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")

spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")

spark.sql("SELECT javaFunction('arg')").show()
Copy

Você pode usar a configuração snowpark.connect.udf.java.imports para incluir também outros tipos de arquivos, como aqueles com dados que seu código precisa ler. Quando você faz isso, observe que seu código deve ler apenas os arquivos incluídos, qualquer gravação nesses arquivos será perdida após o término da execução da função.

snowpark.connect.udf.packages

Especifica uma lista separada por vírgulas de pacotes Python a serem incluídos ao registrar UDFs.

Padrão: (nenhum)

Desde: 1.0.0

Comentários

Você pode usá-la para definir pacotes adicionais a serem disponibilizados em UDFs Python. O valor é uma lista de dependências separadas por vírgulas.

Você pode descobrir a lista de pacotes compatíveis executando o seguinte SQL no Snowflake:

SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Copy
Exemplo
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")

@udtf(returnType="val: int")

class Powers:

  def eval(self, x: int):
      import numpy as np

      for v in np.power(np.array([x, x, x]), [0, 1, 2]):
          yield (int(v),)

spark.udtf.register(name="powers", f=Powers)

spark.sql("SELECT * FROM powers(10)").show()
Copy

Para obter mais informações, consulte Python.

snowpark.connect.udtf.compatibility_mode

Especifique true para habilitar o comportamento de UDTF compatível com Spark para melhor compatibilidade com a semântica de UDTF do Spark.

Padrão: false

Desde: 1.0.0

Comentários

Esta propriedade determina se as UDTFs devem usar o comportamento compatível com Spark ou o comportamento padrão do Snowpark. Quando definida como true, aplica um wrapper de compatibilidade que imita a coerção de tipo de saída e os padrões de tratamento de erros do Spark.

Quando habilitada, as UDTFs usam um wrapper de compatibilidade que aplica a coerção automática de tipo (por exemplo, cadeia de caracteres «true» para booliana, booliana para número inteiro) e tratamento de erros no estilo do Spark. O wrapper também converte argumentos de tabela em objetos semelhantes à linha para acesso posicional e nomeado, além de processar corretamente valores nulos SQL para corresponder aos padrões de comportamento do Spark.

snowpark.connect.version

Retorna a versão atual do Snowpark Connect for Spark. Somente leitura.

Padrão: <current_version>

Desde: 1.0.0

snowpark.connect.views.duplicate_column_names_handling_mode

Especifica como tratar nomes de colunas duplicados em exibições. Os valores permitidos incluem rename (adicionar sufixo), fail (gerar erro) ou drop (remover duplicatas).

Padrão: rename

Desde: 1.0.0

Comentários

O Snowflake não oferece suporte a nomes de colunas duplicados.

Exemplo

O código a seguir falha na etapa de criação da exibição com o seguinte erro de compilação SQL: «duplicate column name “foo”».

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

Para contorná-lo, defina a opção de configuração snowpark.connect.views.duplicate_column_names_handling_mode como um dos seguintes valores:

  • rename: Um sufixo como _dedup_1, _dedup_2, e assim por diante, será anexado a todos os nomes de colunas duplicadas após o primeiro.

  • drop: Todas as colunas duplicadas, exceto uma, serão descartadas. Se as colunas tiverem valores diferentes, isso poderá retornar resultados incorretos.

snowpark.connect.udf.java.imports

Especifica uma lista separada por vírgulas de arquivos e módulos a serem importados para execução de UDF em Java. Aciona a recriação de UDF quando alterada.

Padrão: (nenhum)

Desde: 1.7.0

Comentários

Essa configuração funciona de forma muito semelhante a snowpark.connect.udf.python.imports. Você pode usá-la para especificar bibliotecas e arquivos externos para UDFs Java criadas usando registerJavaFunction. As configurações são mutuamente exclusivas para impedir uma mistura de dependências desnecessária.

Para incluir bibliotecas e arquivos externos, você fornece caminhos de área de preparação para os arquivos como valor da definição de configuração snowpark.connect.udf.java.imports. O valor é uma matriz de caminhos de área de preparação para os arquivos, com os caminhos separados por vírgulas.

Exemplo

O código no exemplo a seguir inclui dois arquivos no contexto de execução da UDF. A UDF importa funções desses arquivos e as utiliza na lógica.

# Files need to be previously staged

spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")

spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")

spark.sql("SELECT javaFunction('arg')").show()
Copy

Você pode usar a configuração snowpark.connect.udf.java.imports para incluir também outros tipos de arquivos, como aqueles com dados que seu código precisa ler. Quando você faz isso, seu código deve ler apenas os arquivos incluídos, qualquer gravação nesses arquivos será perdida após o término da execução da função.

snowpark.connect.udf.packages

Especifica uma lista separada por vírgulas de pacotes Python a serem incluídos ao registrar UDFs.

Padrão: (nenhum)

Desde: 1.0.0

Comentários

A configuração permite definir pacotes adicionais disponíveis nas UDFs Python. O valor é uma lista de dependências separadas por vírgulas.

Você pode descobrir a lista de pacotes compatíveis executando o seguinte SQL no Snowflake:

SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Copy
Exemplo
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")

@udtf(returnType="val: int")

class Powers:

  def eval(self, x: int):
      import numpy as np

      for v in np.power(np.array([x, x, x]), [0, 1, 2]):
          yield (int(v),)

spark.udtf.register(name="powers", f=Powers)

spark.sql("SELECT * FROM powers(10)").show()
Copy

Referência: Referência de pacotes

snowpark.connect.udtf.compatibility_mode

Especifique true para habilitar o comportamento de UDTF compatível com Spark para melhor compatibilidade com a semântica de UDTF do Spark.

Padrão: false

Desde: 1.0.0

Comentários

Esta configuração determina se as UDTFs devem usar o comportamento compatível com Spark ou o comportamento padrão do Snowpark. Quando habilitada (true), aplica um wrapper de compatibilidade que imita a coerção de tipo de saída e os padrões de tratamento de erros do Spark.

Quando habilitada, as UDTFs usam um wrapper de compatibilidade que aplica a coerção automática de tipo (por exemplo, cadeia de caracteres «true» para booliana, booliana para número inteiro) e tratamento de erros no estilo do Spark. O wrapper também converte argumentos de tabela em objetos semelhantes à linha para acesso posicional e nomeado, além de processar corretamente valores nulos SQL para corresponder aos padrões de comportamento do Spark.