Propriedades do Snowpark Connect for Spark

O Snowpark Connect for Spark oferece suporte à configuração personalizada de forma semelhante ao Spark padrão. Você pode modificar as propriedades de configuração somente por meio do método set da sessão usando um par chave-valor. Observe que o Snowpark Connect for Spark reconhece apenas um conjunto limitado de propriedades que influenciam a execução. Todas as propriedades incompatíveis são silenciosamente ignoradas sem gerar uma exceção.

Propriedades do Spark compatíveis

O Snowpark Connect for Spark oferece suporte a um subconjunto de propriedades do Spark.

Nome da propriedade

Padrão

Significado

Desde

spark.app.name

(nenhum)

Nome do aplicativo definido como query_tag do Snowflake (Spark-Connect-App-Name={name}) para rastreamento de consultas.

1.0.0

spark.Catalog.databaseFilterInformationSchema

false

Quando true, filtra INFORMATION_SCHEMA das listagens de banco de dados nas operações de catálogo.

1.0.0

spark.hadoop.fs.s3a.access.key

(nenhum)

ID da chave de acesso da AWS para autenticação no S3 ao ler ou gravar em locais do S3.

1.0.0

spark.hadoop.fs.s3a.assumed.role.arn

(nenhum)

ARN da função do IAM da AWS com acesso ao S3 ao usar autenticação baseada em função.

1.0.0

spark.hadoop.fs.s3a.secret.key

(nenhum)

Chave de acesso secreta da AWS para autenticação no S3 ao ler ou gravar em locais do S3.

1.0.0

spark.hadoop.fs.s3a.server-side-encryption.key

(nenhum)

ID da chave KMS da AWS para criptografia do lado do servidor ao usar o tipo de criptografia AWS_SSE_KMS.

1.0.0

spark.hadoop.fs.s3a.session.token

(nenhum)

Token de sessão da AWS para credenciais temporárias do S3 ao usar STS.

1.0.0

spark.sql.ansi.enabled

false

Habilita o modo ANSI SQL para verificação de tipo e tratamento de erros mais rigorosos. Quando true, estouros aritméticos e conversões inválidas geram erros em vez de retornar NULL.

1.0.0

spark.sql.caseSensitive

false

Controla a diferenciação de letras maiúsculas e minúsculas para identificadores. Quando false, os nomes de colunas e tabelas não diferenciam maiúsculas de minúsculas (colocados automaticamente em maiúsculas no Snowflake).

1.0.0

spark.sql.crossJoin.enabled

true

Habilita ou desabilita junções cruzadas implícitas. Um false ou uma condição de junção ausente ou trivial resultará em erro.

1.0.0

spark.sql.execution.pythonUDTF.arrow.enabled

false

Quando true, habilita a otimização do Apache Arrow para serialização/desserialização de UDTF Python.

1.0.0

spark.sql.globalTempDatabase

global_temp

Nome do esquema para exibições temporárias globais. Criado automaticamente se não existir.

1.0.0

spark.sql.legacy.allowHashOnMapType

false

Quando true, permite hashing de colunas do tipo MAP. Por padrão, os tipos MAP não podem passar por hashing para consistência com o comportamento do Spark.

1.0.0

spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue

false

Comportamento legado para nomeação de chaves de agrupamento de conjuntos de dados.

1.6.0

spark.sql.mapKeyDedupPolicy

EXCEPTION

Controla o comportamento quando chaves duplicadas são encontradas na criação do mapa. Valores: EXCEPTION (gerar erro) ou LAST_WIN (manter o último valor).

1.0.0

spark.sql.parser.quotedRegexColumnNames

false

Quando true, habilita a correspondência de padrão regex em nomes de colunas entre aspas nas consultas SQL (por exemplo, SELECT '(col1|col2)' FROM table).

1.0.0

spark.sql.parquet.outputTimestampType

TIMESTAMP_MILLIS

Controla o tipo de carimbo de data/hora da saída do Parquet. Oferece suporte a TIMESTAMP_MILLIS ou TIMESTAMP_MICROS.

1.7.0

spark.sql.pyspark.inferNestedDictAsStruct.enabled

false

Quando true, infere dicionários Python aninhados como StructType em vez de MapType durante a criação de DataFrame.

1.0.0

spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled

false

Quando true, infere o tipo de elemento da matriz apenas a partir do primeiro elemento, em vez de fazer amostragem de todos os elementos.

1.0.0

spark.sql.repl.eagerEval.enabled

false

Quando true, habilita a avaliação adiantada em REPL mostrando os resultados de DataFrame automaticamente sem chamar show().

1.0.0

spark.sql.repl.eagerEval.maxNumRows

20

Número máximo de linhas a serem exibidas no modo de avaliação adiantada REPL.

1.0.0

spark.sql.repl.eagerEval.truncate

20

Largura máxima para valores de coluna na exibição de avaliação adiantada REPL antes do truncamento.

1.0.0

spark.sql.session.localRelationCacheThreshold

2147483647

Limite de bytes para armazenar em cache relações locais. Relações maiores que esse valor são armazenadas em cache para melhorar o desempenho.

1.0.0

spark.sql.session.timeZone

<system_local_timezone>

Fuso horário da sessão usado para operações de carimbo de data/hora. Sincronizado com a sessão do Snowflake via ALTER SESSION SET TIMEZONE.

1.0.0

spark.sql.sources.default

parquet

Formato padrão da fonte de dados para operações de leitura/gravação quando o formato não é especificado explicitamente.

1.0.0

spark.sql.timestampType

TIMESTAMP_LTZ

Tipo de carimbo de data/hora padrão para operações de carimbo de data/hora. Valores: TIMESTAMP_LTZ (com fuso horário local) ou TIMESTAMP_NTZ (sem fuso horário).

1.0.0

spark.sql.tvf.allowMultipleTableArguments.enabled

true

Quando true, permite que funções com valor de tabela aceitem vários argumentos de tabela.

1.0.0

Propriedades do Snowpark Connect for Spark compatíveis

Propriedades de configuração personalizada específicas do Snowpark Connect for Spark.

Nome da propriedade

Padrão

Significado

Desde

fs.azure.sas.<container>.<account>.blob.core.windows.net

(nenhum)

Token do SAS do Azure para autenticação do Armazenamento de Blobs. Usado ao ler ou gravar em locais do Armazenamento de Blobs do Azure.

1.0.0

fs.azure.sas.fixed.token.<account>.dfs.core.windows.net

(nenhum)

Token do SAS do Azure para autenticação do ADLS Gen2 (Data Lake Storage). Usado ao ler ou gravar em locais do Azure Data Lake Storage Gen2.

1.0.0

mapreduce.fileoutputcommitter.marksuccessfuljobs

false

Quando true, gera um arquivo _SUCCESS após operações de gravação bem-sucedidas para compatibilidade com fluxos de trabalho do Hadoop/Spark.

1.0.0

parquet.enable.summary-metadata

false

Configuração alternativa para gerar arquivos de metadados de resumo do Parquet. Esta configuração ou spark.sql.parquet.enable.summary-metadata habilita o recurso.

1.4.0

snowflake.repartition.for.writes

false

Quando true, força DataFrame.repartition(n) a dividir a saída em arquivos n durante as gravações. Corresponde ao comportamento do Spark, mas adiciona sobrecarga.

1.0.0

snowpark.connect.cte.optimization_enabled

false

Quando true, habilita a otimização da Expressão de Tabela Comum (Common Table Expression, CTE) nas sessões do Snowpark para melhorar o desempenho das consultas.

1.0.0

snowpark.connect.describe_cache_ttl_seconds

300

Tempo de vida em segundos para entradas do cache de consulta. Reduz pesquisas repetidas de esquema.

1.0.0

snowpark.connect.enable_snowflake_extension_behavior

false

Quando true, habilita extensões específicas do Snowflake que podem ser diferentes do comportamento do Spark (como hashing em tipos MAP ou tipo de retorno MD5).

1.0.0

snowpark.connect.handleIntegralOverflow

false

Quando true, o comportamento de estouro integral é alinhado à abordagem do Spark.

1.7.0

snowpark.connect.iceberg.external_volume

(nenhum)

Nome do volume externo do Snowflake para operações de tabela Iceberg.

1.0.0

snowpark.connect.integralTypesEmulation

client_default

Controla a conversão de tipos decimais em integrais. Valores: client_default, enabled, disabled

1.7.0

snowpark.connect.scala.version

2.12

Controla a versão do Scala usada (com suporte para 2.12 ou 2.13)

1.7.0

snowpark.connect.sql.partition.external_table_location

(nenhum)

Caminho do local da tabela externa para gravações particionadas.

1.4.0

snowpark.connect.temporary.views.create_in_snowflake

false

Quando true, cria exibições temporárias diretamente no Snowflake em vez de gerenciá-las localmente.

1.0.1

snowpark.connect.udf.imports [DEPRECATED 1.7.0]

(nenhum)

Lista separada por vírgula de arquivos ou módulos a serem importados para execução de UDF. Aciona a recriação de UDF quando alterada.

1.0.0

snowpark.connect.udf.python.imports

(nenhum)

Lista separada por vírgula de arquivos/módulos a serem importados para execução de UDF em Python. Aciona a recriação de UDF quando alterada.

1.7.0

snowpark.connect.udf.java.imports

(nenhum)

Lista separada por vírgula de arquivos ou módulos a serem importados para execução de UDF em Java. Aciona a recriação de UDF quando alterada.

1.7.0

snowpark.connect.udf.packages

(nenhum)

Lista separada por vírgula de pacotes Python a serem incluídos ao registrar UDFs.

1.0.0

snowpark.connect.udtf.compatibility_mode

false

Quando true, habilita o comportamento de UDTF compatível com Spark para melhor compatibilidade com a semântica de UDTF do Spark.

1.0.0

snowpark.connect.version

<current_version>

Somente leitura. Retorna a versão atual do Snowpark Connect for Spark.

1.0.0

snowpark.connect.views.duplicate_column_names_handling_mode

rename

Como tratar nomes de colunas duplicados em exibições. Valores: rename (adicionar sufixo), fail (gerar erro) ou drop (remover duplicatas).

1.0.0

spark.sql.parquet.enable.summary-metadata

false

Quando true, gera arquivos de metadados de resumo do Parquet (_metadata _common_metadata) durante as gravações do Parquet.

1.4.0

snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables

false

Quando true, permite a substituição de partições em tabelas Snowflake no Spark SQL (INSERT OVERWRITE <table> PARTITION(<partition spec>)).

1.12.3

snowpark.connect.artifact_repository

(nenhum)

Especifica o nome de um repositório de artefatos do Snowflake para a resolução de pacotes UDF/UDTF. Quando definido, os pacotes são resolvidos a partir do repositório especificado em vez do Anaconda.

1.14.0

snowpark.connect.udf.resource_constraint.architecture

(nenhum)

Quando definido como x86, UDFs, UDTFs e operações applyInPandas são criadas com uma restrição de arquitetura x86. Requer um warehouse com uma restrição de recurso x86.

1.13.0

fs.azure.sas.<container>.<account>.blob.core.windows.net

Especifica o token do SAS do Azure para autenticação do Armazenamento de Blobs. Usado ao ler ou gravar em locais do Armazenamento de Blobs do Azure.

Padrão: (nenhum)

Desde: 1.0.0

fs.azure.sas.fixed.token.<account>.dfs.core.windows.net

Especifica o token do SAS do Azure para autenticação do ADLS Gen2 (Data Lake Storage). Usado ao ler ou gravar em locais do Azure Data Lake Storage Gen2.

Padrão: (nenhum)

Desde: 1.0.0

mapreduce.fileoutputcommitter.marksuccessfuljobs

Especifica true para gerar um arquivo _SUCCESS após operações de gravação bem-sucedidas para compatibilidade com fluxos de trabalho do Hadoop ou Spark.

Padrão: false

Desde: 1.0.0

parquet.enable.summary-metadata

Especifica a configuração alternativa para gerar arquivos de metadados de resumo do Parquet. Habilita o recurso com esta propriedade ou spark.sql.parquet.enable.summary-metadata.

Padrão: false

Desde: 1.4.0

snowflake.repartition.for.writes

Especifique true para forçar DataFrame.repartition(n) a dividir a saída em arquivos n durante as gravações. Corresponde ao comportamento do Spark, mas adiciona sobrecarga.

Padrão: false

Desde: 1.0.0

snowpark.connect.cte.optimization_enabled

Especifique true para habilitar a otimização da Expressão de Tabela Comum (Common Table Expression, CTE) na sessão do Snowpark para desempenho das consultas.

Padrão: false

Desde: 1.0.0

Comentários

Configuração que habilita as Expressões de Tabela Comuns do Snowflake (CTEs). Essa configuração otimiza as consultas do Snowflake nas quais há muitos blocos de código repetitivos. Essa modificação vai melhorar tanto a compilação de consultas quanto o desempenho da execução.

snowpark.connect.describe_cache_ttl_seconds

Especifica o tempo de vida, em segundos, para entradas do cache de consulta. Reduz pesquisas repetidas de esquema.

Padrão: 300

Desde: 1.0.0

snowpark.connect.enable_snowflake_extension_behavior

Especifique true para habilitar extensões específicas do Snowflake que podem ser diferentes do comportamento do Spark (como hashing em tipo de retorno MD5 para tipos MAP).

Padrão: false

Desde: 1.0.0

Comentários

Quando definida como true, altera o comportamento de determinadas operações:

snowpark.connect.handleIntegralOverflow

Especifique true para alinhar o comportamento de estouro integral à abordagem do Spark.

Padrão: false

Desde: 1.7.0

snowpark.connect.iceberg.external_volume

Especifica o nome do volume externo do Snowflake para operações de tabela Iceberg.

Padrão: (nenhum)

Desde: 1.0.0

snowpark.connect.integralTypesEmulation

Especifica como converter tipos decimais em integrais. Valores: client_default, enabled, disabled

Padrão: client_default

Desde: 1.7.0

Comentários

Por padrão, o Snowpark Connect for Spark trata todos os tipos integrais como Long. Isso é causado pela maneira como os números são representados no Snowflake. A emulação de tipos integrais permite um mapeamento exato entre os tipos do Snowpark e do Spark ao ler fontes de dados.

A opção padrão client_default ativa a emulação somente quando o script é executado do cliente Scala. Os tipos integrais são mapeados com base nas seguintes precisões:

Precisão

Tipo do Spark

19

LongType

10

IntegerType

5

ShortType

3

ByteType

Outros

DecimalType(precision, 0)

Quando outras precisões são encontradas, o tipo final é mapeado para o DecimalType.

snowpark.connect.scala.version

Especifica a versão do Scala a ser usada (compatível com 2.12 ou 2.13).

Padrão: 2.12

Desde: 1.7.0

snowpark.connect.sql.partition.external_table_location

Especifica o caminho do local da tabela externa para gravações particionadas.

Padrão: (nenhum)

Desde: 1.4.0

Comentários

Para ler apenas um subconjunto exato de arquivos particionados do diretório fornecido, é necessária uma configuração adicional. Este recurso só está disponível para arquivos armazenados em áreas de preparação externas. Para remover os arquivos de leitura, o Snowpark Connect for Spark usa tabelas externas.

Este recurso é habilitado quando a configuração snowpark.connect.sql.partition.external_table_location é definida. Ele deve conter os nomes existentes de bancos de dados e esquemas em que as tabelas externas serão criadas.

A leitura de arquivos Parquet armazenados em áreas de preparação externas cria uma tabela externa. Para arquivos em áreas de preparação internas, ela não será criada. O fornecimento do esquema reduz o tempo de execução, o que elimina o custo para inferi-lo com base nas fontes.

Para melhor desempenho, filtre de acordo com as limitações de filtragem de tabelas externas do Snowflake.

Exemplo
spark.conf.set("snowpark.connect.sql.partition.external_table_location", "<database-name>.<schema-name>")

spark.read.parquet("@external-stage/example").filter(col("x") > lit(1)).show()

schema = StructType([StructField("x",IntegerType()),StructField("y",DoubleType())])

spark.read.schema(schema).parquet("@external-stage/example").filter(col("x") > lit(1)).show()
Copy

snowpark.connect.temporary.views.create_in_snowflake

Especifique true para criar exibições temporárias diretamente no Snowflake em vez de gerenciá-las localmente.

Padrão: false

Desde: 1.0.1

snowpark.connect.udf.imports [DEPRECATED 1.7.0]

Especifica uma lista separada por vírgulas de arquivos e módulos a serem importados para execução de UDF. Quando esse valor é alterado, a recriação da UDF é acionada.

Padrão: (nenhum)

Desde: 1.0.0

snowpark.connect.udf.python.imports

Especifica uma lista separada por vírgulas de arquivos e módulos a serem importados para execução de UDF em Python. Quando esse valor é alterado, a recriação da UDF é acionada.

Padrão: (nenhum)

Desde: 1.7.0

snowpark.connect.udf.java.imports

Especifica uma lista separada por vírgulas de arquivos e módulos a serem importados para execução de UDF em Java. Aciona a recriação de UDF quando alterada.

Padrão: (nenhum)

Desde: 1.7.0

Comentários

Essa configuração funciona de forma muito semelhante a snowpark.connect.udf.python.imports. Com ela, você pode especificar bibliotecas e arquivos externos para UDFs Java criadas usando registerJavaFunction. As configurações são mutuamente exclusivas para impedir uma mistura de dependências desnecessária.

Para incluir bibliotecas e arquivos externos, você fornece caminhos de área de preparação para os arquivos como valor da definição de configuração snowpark.connect.udf.java.imports. O valor da configuração deve ser uma matriz de caminhos de área de preparação para os arquivos, com os caminhos separados por vírgulas.

Exemplo

O código no exemplo a seguir inclui dois arquivos no contexto de execução da UDF. A UDF importa funções desses arquivos e as utiliza na lógica.

# Files need to be previously staged

spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")

spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")

spark.sql("SELECT javaFunction('arg')").show()
Copy

Você pode usar a configuração snowpark.connect.udf.java.imports para incluir também outros tipos de arquivos, como aqueles com dados que seu código precisa ler. Quando você faz isso, observe que seu código deve ler apenas os arquivos incluídos, qualquer gravação nesses arquivos será perdida após o término da execução da função.

snowpark.connect.udf.packages

Especifica uma lista separada por vírgulas de pacotes Python a serem incluídos ao registrar UDFs.

Padrão: (nenhum)

Desde: 1.0.0

Comentários

Você pode usá-la para definir pacotes adicionais a serem disponibilizados em UDFs Python. O valor é uma lista de dependências separadas por vírgulas.

Você pode descobrir a lista de pacotes compatíveis executando o seguinte SQL no Snowflake:

SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Copy
Exemplo
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")

@udtf(returnType="val: int")

class Powers:

  def eval(self, x: int):
      import numpy as np

      for v in np.power(np.array([x, x, x]), [0, 1, 2]):
          yield (int(v),)

spark.udtf.register(name="powers", f=Powers)

spark.sql("SELECT * FROM powers(10)").show()
Copy

Para obter mais informações, consulte Python.

snowpark.connect.udtf.compatibility_mode

Especifique true para habilitar o comportamento de UDTF compatível com Spark para melhor compatibilidade com a semântica de UDTF do Spark.

Padrão: false

Desde: 1.0.0

Comentários

Esta propriedade determina se as UDTFs devem usar o comportamento compatível com Spark ou o comportamento padrão do Snowpark. Quando definida como true, aplica um wrapper de compatibilidade que imita a coerção de tipo de saída e os padrões de tratamento de erros do Spark.

Quando habilitada, as UDTFs usam um wrapper de compatibilidade que aplica a coerção automática de tipo (por exemplo, cadeia de caracteres «true» para booliana, booliana para número inteiro) e tratamento de erros no estilo do Spark. O wrapper também converte argumentos de tabela em objetos semelhantes à linha para acesso posicional e nomeado, além de processar corretamente valores nulos SQL para corresponder aos padrões de comportamento do Spark.

snowpark.connect.version

Retorna a versão atual do Snowpark Connect for Spark. Somente leitura.

Padrão: <current_version>

Desde: 1.0.0

snowpark.connect.views.duplicate_column_names_handling_mode

Especifica como tratar nomes de colunas duplicados em exibições. Os valores permitidos incluem rename (adicionar sufixo), fail (gerar erro) ou drop (remover duplicatas).

Padrão: rename

Desde: 1.0.0

Comentários

O Snowflake não oferece suporte a nomes de colunas duplicados.

Exemplo

O código a seguir falha na etapa de criação da exibição com o seguinte erro de compilação SQL: «duplicate column name “foo”».

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

Para contorná-lo, defina a opção de configuração snowpark.connect.views.duplicate_column_names_handling_mode como um dos seguintes valores:

  • rename: Um sufixo como _dedup_1, _dedup_2, e assim por diante, será anexado a todos os nomes de colunas duplicadas após o primeiro.

  • drop: Todas as colunas duplicadas, exceto uma, serão descartadas. Se as colunas tiverem valores diferentes, isso poderá retornar resultados incorretos.

snowpark.connect.udf.java.imports

Especifica uma lista separada por vírgulas de arquivos e módulos a serem importados para execução de UDF em Java. Aciona a recriação de UDF quando alterada.

Padrão: (nenhum)

Desde: 1.7.0

Comentários

Essa configuração funciona de forma muito semelhante a snowpark.connect.udf.python.imports. Você pode usá-la para especificar bibliotecas e arquivos externos para UDFs Java criadas usando registerJavaFunction. As configurações são mutuamente exclusivas para impedir uma mistura de dependências desnecessária.

Para incluir bibliotecas e arquivos externos, você fornece caminhos de área de preparação para os arquivos como valor da definição de configuração snowpark.connect.udf.java.imports. O valor é uma matriz de caminhos de área de preparação para os arquivos, com os caminhos separados por vírgulas.

Exemplo

O código no exemplo a seguir inclui dois arquivos no contexto de execução da UDF. A UDF importa funções desses arquivos e as utiliza na lógica.

# Files need to be previously staged

spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")

spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")

spark.sql("SELECT javaFunction('arg')").show()
Copy

Você pode usar a configuração snowpark.connect.udf.java.imports para incluir também outros tipos de arquivos, como aqueles com dados que seu código precisa ler. Quando você faz isso, seu código deve ler apenas os arquivos incluídos, qualquer gravação nesses arquivos será perdida após o término da execução da função.

snowpark.connect.udf.packages

Especifica uma lista separada por vírgulas de pacotes Python a serem incluídos ao registrar UDFs.

Padrão: (nenhum)

Desde: 1.0.0

Comentários

A configuração permite definir pacotes adicionais disponíveis nas UDFs Python. O valor é uma lista de dependências separadas por vírgulas.

Você pode descobrir a lista de pacotes compatíveis executando o seguinte SQL no Snowflake:

SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Copy
Exemplo
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")

@udtf(returnType="val: int")

class Powers:

  def eval(self, x: int):
      import numpy as np

      for v in np.power(np.array([x, x, x]), [0, 1, 2]):
          yield (int(v),)

spark.udtf.register(name="powers", f=Powers)

spark.sql("SELECT * FROM powers(10)").show()
Copy

Referência: Referência de pacotes

snowpark.connect.udtf.compatibility_mode

Especifique true para habilitar o comportamento de UDTF compatível com Spark para melhor compatibilidade com a semântica de UDTF do Spark.

Padrão: false

Desde: 1.0.0

Comentários

Esta configuração determina se as UDTFs devem usar o comportamento compatível com Spark ou o comportamento padrão do Snowpark. Quando habilitada (true), usa um wrapper de compatibilidade que aplica a coerção automática de tipo (por exemplo, cadeia de caracteres «true» para booliana, booliana para número inteiro) e tratamento de erros no estilo do Spark.

O wrapper também converte argumentos de tabela em objetos semelhantes à linha para acesso posicional e nomeado, além de processar corretamente valores nulos SQL para corresponder aos padrões de comportamento do Spark.

snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables

Quando true, permite a substituição de partições em tabelas Snowflake no Spark SQL (INSERT OVERWRITE <table> PARTITION(<partition spec>)).

Padrão: false

Desde: 1.12.3

Comentários

As tabelas Snowflake não oferecem suporte a particionamento definido pelo usuário e, por padrão, a substituição de partições resultará em erro. A ativação dessa opção permite usar INSERT OVERWRITE <table> PARTITION(<partition spec>) para executar substituições.

<partition spec> aceitará todas as colunas existentes na tabela de destino.

Exemplo

O código no exemplo a seguir substitui todas as linhas na tabela students com student_id 222222.

spark.conf.set("snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables", True)

# create the students and persons tables as standard Snowflake tables
students_data = [
  ("Ashua Hill", "456 Erica Ct, Cupertino", 111111),
  ("Brian Reed", "723 Kern Ave, Palo Alto", 222222)
]

students_df = spark.createDataFrame(students_data, ["name", "address", "student_id"])
students_df.write.mode("overwrite").saveAsTable("students")

persons_data = [
    ("Dora Williams", "134 Forest Ave, Menlo Park", 123456789),
    ("Eddie Davis", "245 Market St, Milpitas", 345678901)
]

persons_df = spark.createDataFrame(persons_data, ["name", "address", "ssn"])
persons_df.write.mode("overwrite").saveAsTable("persons")

# overwrites all rows in the students table that have a student_id of 222222
spark.sql("""
    INSERT OVERWRITE students PARTITION (student_id = 222222)
    SELECT name, address FROM persons WHERE name = 'Dora Williams'
""").collect()
Copy

snowpark.connect.artifact_repository

Especifica o nome de um repositório de artefatos Snowflake a ser usado para resolução de pacotes ao registrar operações cogroup, applyInPandas, mapInArrow, UDFs e UDTFs. Quando configurado, os pacotes especificados via snowpark.connect.udf.packages são resolvidos a partir do repositório de artefatos especificado em vez do Anaconda.

Padrão: (nenhum)

Desde: 1.14.0

Comentários

Por padrão, Snowpark Connect for Spark resolve pacotes Python do canal Anaconda selecionado do Snowflake. Definir essa configuração como um nome de repositório de artefatos permite a resolução de pacotes de PyPI ou outras fontes configuradas, possibilitando o uso de pacotes que não estão disponíveis no canal Anaconda.

Para obter informações sobre como criar e configurar um repositório de artefatos no Snowflake, consulte:doc:/developer-guide/udf/python/udf-python-packages .

Alterar essa configuração invalida UDFs e UDTFs em cache, fazendo com que elas sejam recriadas com o novo repositório na próxima invocação.

Essa configuração se aplica às seguintes operações:

  • UDFs registradas via decorador @udf ou spark.udf.register()

  • UDTFs registradas via decorador @udtf ou spark.udtf.register()

  • applyInPandas via groupBy().applyInPandas()

  • mapInArrow via DataFrame.mapInArrow()

  • cogroup via groupBy().cogroup().applyInPandas()

Exemplo

O exemplo a seguir configura o repositório de artefatos e, em seguida, define uma UDF que usa pykalman, um pacote disponível no repositório de artefatos, para aplicar a suavização do filtro de Kalman.

spark.conf.set("snowpark.connect.artifact_repository", "my_pypi_repo")
spark.conf.set("snowpark.connect.udf.packages", "[pykalman]")

@udf(returnType=DoubleType())
def kalman_smooth_value(value: float) -> float:
    import numpy as np
    from pykalman import KalmanFilter

    kf = KalmanFilter(
        transition_matrices=[1],
        observation_matrices=[1],
        initial_state_mean=0,
        initial_state_covariance=1,
        observation_covariance=1,
        transition_covariance=0.1,
    )
    observations = np.array([value, value, value])
    smoothed_state_means, _ = kf.smooth(observations)
    return float(smoothed_state_means[-1][0])

df = spark.createDataFrame([(1, 10.0), (2, 20.0), (3, 30.0)], ["id", "value"])
df.select("id", kalman_smooth_value("value").alias("smoothed")).show()
Copy

Para obter mais informações sobre repositórios de artefatos e pacotes disponíveis, consulte Como usar pacotes de terceiros.

snowpark.connect.udf.resource_constraint.architecture

Quando definido como x86, UDFs, UDTFs e operações applyInPandas são criadas com uma restrição de arquitetura x86. Isso requer um warehouse configurado com uma restrição de recursos x86 para execução.

Padrão: (nenhum)

Desde: 1.13.0

Comentários

Alguns pacotes Python de terceiros (como TensorFlow, XGBoost e certas bibliotecas científicas) são compilados apenas para a arquitetura de CPU x86. Definir essa configuração como x86 adiciona RESOURCE_CONSTRAINT=(architecture='x86') à instrução CREATE FUNCTION gerada por Snowpark Connect for Spark, garantindo que a UDF seja executada em uma infraestrutura compatível com x86.

Para usar essa configuração, execute sua carga de trabalho em um warehouse que tenha sido criado com uma restrição de recursos x86. Os seguintes valores de restrição de recursos são compatíveis com x86:

  • MEMORY_1X_x86 (tamanho mínimo do warehouse: XSMALL)

  • MEMORY_16X_x86 (tamanho mínimo do warehouse: MEDIUM)

  • MEMORY_64X_x86 (tamanho mínimo do warehouse: LARGE)

Se o warehouse não tiver uma restrição de recurso x86, a execução da UDF falhará.

Essa configuração se aplica às seguintes operações:

  • UDFs registradas via decorador @udf ou spark.udf.register()

  • UDTFs registradas via decorador @udtf ou spark.udtf.register()

  • applyInPandas via groupBy().applyInPandas()

Exemplo

O exemplo a seguir cria um warehouse com uma restrição de recursos x86 e, em seguida, configura Snowpark Connect for Spark para usar a arquitetura x86 para UDFs.

CREATE WAREHOUSE my_x86_warehouse WITH
  WAREHOUSE_SIZE = 'MEDIUM'
  WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'
  RESOURCE_CONSTRAINT = 'MEMORY_16X_x86';

USE WAREHOUSE my_x86_warehouse;
Copy
spark.conf.set("snowpark.connect.udf.resource_constraint.architecture", "x86")

@udf(returnType=IntegerType())
def add_one(x: int) -> int:
    return x + 1

df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
df.select(add_one(df["value"]).alias("result")).show()
Copy

Para obter mais informações sobre warehouses e restrições de recursos, consulte Warehouses otimizados para Snowpark.