Uso do conector do Spark¶
O conector é compatível com a API padrão do Spark, mas com a adição de opções específicas do Snowflake, que são descritas neste tópico.
Neste tópico, o termo COPY refere-se a:
COPY INTO <tabela> (usado para transferir dados de um estágio interno ou externo para uma tabela).
COPY INTO <local> (usado para transferir dados de uma tabela para um estágio interno ou externo).
Neste tópico:
Verificação da conexão da rede ao Snowflake com SnowCD¶
Após configurar seu driver, você pode avaliar e solucionar problemas de conectividade de rede com o Snowflake usando o SnowCD.
Você pode usar o SnowCD durante o processo de configuração inicial e sob demanda a qualquer momento para avaliar e solucionar problemas de sua conexão de rede ao Snowflake.
Pushdown¶
O conector do Spark aplica o pushdown de predicado e consulta capturando e analisando os planos lógicos do Spark para operações SQL. Quando a fonte de dados é o Snowflake, as operações são convertidas em uma consulta SQL e depois executadas no Snowflake para melhorar o desempenho.
Entretanto, como essa precisa ser quase uma conversão um-para-um dos operadores SQL do Spark para expressões do Snowflake, nem todos os operadores SQL do Spark podem ser otimizados. Quando o pushdown falha, o conector volta para um plano de execução menos otimizado. As operações sem suporte são realizadas no Spark.
Nota
Se você precisar de pushdown para todas as operações, considere escrever seu código para usar API Snowpark em seu lugar.
Abaixo está uma lista das operações com suporte para pushdown (todas as funções abaixo usam seus nomes Spark). Se uma função não estiver nesta lista, um plano do Spark que a utilize pode ser executado no Spark em vez de ser enviado ao Snowflake.
Funções de agregação
Média
Corr
CovPopulation
CovSample
Contagem
Max
Min
StddevPop
StddevSamp
Soma
VariancePop
VarianceSamp
Operadores booleanos
E
Between
Contains
EndsWith
EqualTo
GreaterThan
GreaterThanOrEqual
In
IsNull
IsNotNull
LessThan
LessThanOrEqual
Not
Ou
StartsWith
Funções de data, hora e carimbo de data/hora
DateAdd
DateSub
Month
Quarter
TruncDate
TruncTimestamp
Ano
Funções matemáticas
Operadores aritméticos “+” (adição), “-” (subtração), “*” (multiplicação), “/” (divisão) e “-” (negação unária).
Abs
Acos
Asin
Atan
Ceil
CheckOverflow
Cos
Cosh
Exp
Floor
Greatest
Least
Log
Pi
Pow
PromotePrecision
Rand
Round
Sin
Sinh
Sqrt
Tan
Tanh
Operadores diversos
Alias (expressões AS)
BitwiseAnd
BitwiseNot
BitwiseOr
BitwiseXor
CaseWhen
Cast(child, t, _)
Coalesce
If
MakeDecimal
ScalarSubquery
ShiftLeft
ShiftRight
SortOrder
UnscaledValue
Operadores relacionais
Funções agregadas e cláusulas group-by
Distinct
Filtros
In
InSet
Junções
Limites
Projeções
Classificações (ORDER BY)
Union e Union All
Funções de janela e cláusulas de windowing
Funções de cadeia de caracteres
Ascii
Concat(children)
Length
Like
Lower
StringLPad
StringRPad
StringTranslate
StringTrim
StringTrimLeft
StringTrimRight
Substring
Upper
Funções de janela (nota: não funcionam com o Spark 2.2)
DenseRank
Rank
RowNumber
Uso do conector no Scala¶
Especificação do nome da classe de fonte de dados¶
Para usar o Snowflake como fonte de dados no Spark, use a opção .format
para fornecer o nome da classe do conector do Snowflake que define a fonte de dados.
net.snowflake.spark.snowflake
Para garantir uma verificação no tempo de compilação do nome da classe, a Snowflake recomenda fortemente a definição de uma variável para o nome da classe. Por exemplo:
val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
Além disso, por conveniência, a classe Utils
fornece a variável, que pode ser importada da seguinte forma:
import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
Nota
Todos os exemplos neste tópico utilizam SNOWFLAKE_SOURCE_NAME
como a definição de classe.
Habilitação/desabilitação do pushdown em uma sessão¶
A versão 2.1.0 (e superior) do conector é compatível com o pushdown de consulta, que pode melhorar significativamente o desempenho enviando o processamento da consulta para o Snowflake quando ele é a fonte de dados do Spark.
Por padrão, o pushdown está ativado.
Para desabilitar o pushdown em uma sessão do Spark para um determinado DataFrame:
Após instanciar um objeto
SparkSession
, chame o método estáticoSnowflakeConnectorUtils.disablePushdownSession
, passando o objetoSparkSession
. Por exemplo:SnowflakeConnectorUtils.disablePushdownSession(spark)
Onde
spark
é seu objetoSparkSession
.Crie um DataFrame com a opção autopushdown definida como
off
. Por exemplo:val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", query) .option("autopushdown", "off") .load()
Observe que você também pode definir a opção
autopushdown
em umMap
que você passa para o métodooptions
(por exemplo, emsfOptions
no exemplo acima).
Para habilitar o pushdown novamente após desativá-lo, chame o método estático SnowflakeConnectorUtils.enablePushdownSession
(passando o objeto SparkSession
) e crie um DataFrame com autopushdown
habilitado.
Movimentação de dados do Snowflake para o Spark¶
Nota
Ao utilizar DataFrames, o conector do Snowflake aceita somente consultas SELECT.
Para ler dados do Snowflake em umDataFrame do Spark:
Use o método
read()
do objetoSqlContext
para construir umDataFrameReader
.Especifique
SNOWFLAKE_SOURCE_NAME
usando o métodoformat()
. Para a definição, consulte Especificação do nome da classe de fonte de dados (neste tópico).Especifique as opções do conector usando o método
option()
ouoptions()
. Para obter mais informações, consulte Definição das opções de configuração para o conector (neste tópico).Especifique uma das seguintes opções para os dados da tabela a serem lidos:
dbtable
: nome da tabela a ser lida. Todas as colunas e registros são recuperados (ou seja, é equivalente aSELECT * FROM db_table
).query
: consulta exata (instrução SELECT) a ser executada.
Notas de uso¶
Atualmente, o conector não aceita outros tipos de consultas (por exemplo, SHOW ou DESC, ou instruções DML) ao usar DataFrames.
Há um limite máximo para o tamanho de uma linha individual. Para obter mais detalhes, consulte Limites no tamanho do texto de consulta.
Considerações de desempenho¶
Ao transferir dados entre o Snowflake e o Spark, use os seguintes métodos para analisar/melhorar o desempenho:
Use o método
net.snowflake.spark.snowflake.Utils.getLastSelect()
para ver a consulta real emitida ao mover os dados do Snowflake para o Spark.Se usar a funcionalidade
filter
ouwhere
do DataFrame do Spark, verifique se os respectivos filtros estão presentes na consulta SQL emitida. O conector do Snowflake tenta converter todos os filtros solicitados pelo Spark para SQL.No entanto, existem formas de filtros que a infraestrutura do Spark hoje não passa para o conector do Snowflake. Como resultado, em algumas situações, um grande número de registros desnecessários é solicitado ao Snowflake.
Se você precisar de apenas um subconjunto de colunas, certifique-se de indicar o subconjunto na consulta SQL.
Em geral, se a consulta SQL emitida não corresponder ao que você espera com base nas operações do DataFrame, use a opção
query
para fornecer a sintaxe SQL exata que deseja.
Exemplos¶
Ler uma tabela inteira:
val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t1") .load()
Ler os resultados de uma consulta:
val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1") .load()
Movimentação de dados do Spark para o Snowflake¶
As etapas para salvar o conteúdo de um DataFrame em uma tabela do Snowflake são semelhantes a gravar do Snowflake no Spark:
Use o método
write()
doDataFrame
para construir umDataFrameWriter
.Especifique
SNOWFLAKE_SOURCE_NAME
usando o métodoformat()
. Para a definição, consulte Especificação do nome da classe de fonte de dados (neste tópico).Especifique as opções do conector usando o método
option()
ouoptions()
. Para obter mais informações, consulte Definição das opções de configuração para o conector (neste tópico).Use a opção
dbtable
para especificar a tabela na qual os dados são gravados.Use o método
mode()
para especificar o modo de salvamento do conteúdo.Para obter mais informações, consulte SaveMode (documentação do Spark).
Exemplos¶
df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t2") .mode(SaveMode.Overwrite) .save()
Exportação de JSON do Spark para o Snowflake¶
DataFrames do Spark podem conter objetos JSON, serializados como cadeias de caracteres. O seguinte código fornece um exemplo de conversão de um DataFrame regular em um DataFrame contendo dados JSON:
val rdd = myDataFrame.toJSON val schema = new StructType(Array(StructField("JSON", StringType))) val jsonDataFrame = sqlContext.createDataFrame( rdd.map(s => Row(s)), schema)
Observe que o jsonDataFrame
resultante contém uma única coluna do tipo StringType
. Como resultado, quando esse DataFrame é exportado para o Snowflake com o modo SaveMode.Overwrite
comum, uma nova tabela no Snowflake é criada com uma única coluna do tipo VARCHAR
.
Para carregar jsonDataFrame
em uma coluna VARIANT
:
Crie uma tabela do Snowflake (conectando-se ao Snowflake em Java usando o driver JDBC do Snowflake). Para explicações sobre os parâmetros de conexão usados no exemplo, consulte Referência dos parâmetros de conexão do driver JDBC.
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; public class SnowflakeJDBCExample { public static void main(String[] args) throws Exception { String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.com/"; Properties properties = new Properties(); properties.put("user", "peter"); properties.put("password", "test"); properties.put("account", "myorganization-myaccount"); properties.put("warehouse", "mywh"); properties.put("db", "mydb"); properties.put("schema", "public"); // get connection System.out.println("Create JDBC connection"); Connection connection = DriverManager.getConnection(jdbcUrl, properties); System.out.println("Done creating JDBC connection\n"); // create statement System.out.println("Create JDBC statement"); Statement statement = connection.createStatement(); System.out.println("Done creating JDBC statement\n"); // create a table System.out.println("Create my_variant_table table"); statement.executeUpdate("create or replace table my_variant_table(json VARIANT)"); statement.close(); System.out.println("Done creating demo table\n"); connection.close(); System.out.println("Close connection\n"); } }
Em vez de usar
SaveMode.Overwrite
, useSaveMode.Append
para reutilizar a tabela existente. Quando o valor da cadeia de caracteres que representa JSON é carregado no Snowflake, porque a coluna de destino é do tipo VARIANT, ela é analisada como JSON. Por exemplo:df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "my_variant_table") .mode(SaveMode.Append) .save()
Execução de instruções SQL DDL/DML¶
Use o método runQuery()
do objeto Utils
para executar instruções SQL DDL/DML, além de consultas, por exemplo:
var sfOptions = Map(
"sfURL" -> "<account_identifier>.snowflakecomputing.com",
"sfUser" -> "<user_name>",
"sfPassword" -> "<password>",
"sfDatabase" -> "<database>",
"sfSchema" -> "<schema>",
"sfWarehouse" -> "<warehouse>"
)
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")
onde sfOptions
é o mapa de parâmetros usado para ler/gravar DataFrames.
O método runQuery
retorna apenas TRUE ou FALSE. Destina-se a instruções que não retornam um conjunto de resultados; por exemplo instruções DDL como CREATE TABLE
e instruções DML como INSERT
, UPDATE
e DELETE
. Não é útil para instruções que retornam um conjunto de resultados, tais como SELECT
ou SHOW
.
Como trabalhar com carimbos de data/hora e fusos horários¶
O Spark fornece apenas um tipo de carimbo de data/hora, equivalente ao tipo Timestamp do Scala/Java. É quase idêntico em comportamento ao tipo de dados TIMESTAMP_LTZ (fuso horário local) do Snowflake. Como tal, ao transferir dados entre o Spark e o Snowflake, a Snowflake recomenda utilizar as seguintes abordagens para preservar a hora corretamente, em relação aos fusos horários:
Use somente o tipo de dados TIMESTAMP_LTZ no Snowflake.
Nota
O mapeamento padrão do tipo de dados de carimbo de data/hora é TIMESTAMP_NTZ (sem fuso horário), então você precisa definir explicitamente o parâmetro TIMESTAMP_TYPE_MAPPING para usar TIMESTAMP_LTZ.
Defina o fuso horário do Spark como
UTC
e use esse fuso horário no Snowflake (ou seja, não defina a opçãosfTimezone
para o conector e não defina explicitamente um fuso horário no Snowflake). Neste cenário, TIMESTAMP_LTZ e TIMESTAMP_NTZ são efetivamente equivalentes.Para definir o fuso horário, adicione a seguinte linha ao seu código Spark:
java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
Se você não implementar uma dessas abordagens, podem ocorrer modificações de hora indesejadas. Por exemplo, considere o seguinte cenário:
O fuso horário do Spark está definido como
America/New_York
.O fuso horário do Snowflake está definido como
Europe/Warsaw
, o que pode ser feito de uma destas formas:Definir
sfTimezone
comoEurope/Warsaw
para o conector.Definir
sfTimezone
comosnowflake
para o conector e definir o parâmetro da sessão TIMEZONE no Snowflake comoEurope/Warsaw
.
Tanto TIMESTAMP_NTZ como TIMESTAMP_LTZ estão em uso no Snowflake.
Neste cenário:
Se um valor representando
12:00:00
em uma coluna TIMESTAMP_NTZ no Snowflake for enviado para o Spark, este valor não traz nenhuma informação de fuso horário. O Spark trata o valor como12:00:00
em Nova Iorque.Se o Spark envia este valor
12:00:00
(em Nova Iorque) de volta para o Snowflake para ser carregado em uma coluna TIMESTAMP_LTZ, ele é automaticamente convertido e carregado como18:00:00
(para o fuso horário de Varsóvia).Se este valor for então convertido para TIMESTAMP_NTZ no Snowflake, o usuário vê
18:00:00
, que é diferente do valor original,12:00:00
.
Para resumir, a Snowflake recomenda seguir estritamente pelo menos uma destas regras:
Use o mesmo fuso horário, idealmente
UTC
, tanto para o Spark como para o Snowflake.Utilize somente o tipo de dados TIMESTAMP_LTZ para transferir dados entre o Spark e o Snowflake.
Exemplo de programa Scala¶
Importante
Este programa de exemplo presume que você está usando a versão 2.2.0 (ou superior) do conector, que usa um estágio interno do Snowflake para armazenar dados temporários e, portanto, não requer um local S3 para armazenamento de dados temporários. Se você estiver usando uma versão anterior, é preciso ter um local S3 e incluir valores para tempdir
, awsAccessKey
, awsSecretKey
para sfOptions
. Para obter mais detalhes, consulte Opções do AWS para transferência de dados externa (neste tópico).
O programa Scala a seguir fornece um caso de uso completo para o conector do Snowflake para Spark. Antes de usar o código, substitua as seguintes cadeias de caracteres pelos valores apropriados, conforme descrito em Definição das opções de configuração para o conector (neste tópico):
<identificador_da_conta>
: seu identificador de conta.<nome_de_usuário>
<senha>
: credenciais de login para o usuário do Snowflake.<banco de dados>
,<esquema>
<warehouse>
: padrões para a sessão do Snowflake.
O programa Scala de exemplo usa autenticação básica (ou seja, nome de usuário e senha). Se quiser autenticar com OAuth, consulte Uso de OAuth externo (neste tópico).
import org.apache.spark.sql._
//
// Configure your Snowflake environment
//
var sfOptions = Map(
"sfURL" -> "<account_identifier>.snowflakecomputing.com",
"sfUser" -> "<user_name>",
"sfPassword" -> "<password>",
"sfDatabase" -> "<database>",
"sfSchema" -> "<schema>",
"sfWarehouse" -> "<warehouse>"
)
//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t1")
.load()
//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("query", "select c1, count(*) from t1 group by c1")
.load()
//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t2")
.mode(SaveMode.Overwrite)
.save()
Uso do conector com Python¶
Usar o conector com Python é muito semelhante ao uso com Scala.
Recomendamos utilizar o script bin/pyspark
incluído na distribuição Spark.
Configuração do script pyspark
¶
O script pyspark
deve ser configurado de forma semelhante ao script spark-shell
, usando-se as opções --packages
ou --jars
. Por exemplo:
bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3
Não se esqueça de incluir o conector Spark do Snowflake e os arquivos .jar do conector JDBC em sua variável de ambiente CLASSPATH.
Para obter mais informações sobre a configuração do script spark-shell
, consulte Etapa 4: Configurar o cluster Spark local ou ambiente Spark hospedado no Amazon EMR.
Habilitação/desabilitação do pushdown em uma sessão¶
A versão 2.1.0 (e superior) do conector é compatível com o pushdown de consulta, que pode melhorar significativamente o desempenho enviando o processamento da consulta para o Snowflake quando ele é a fonte de dados do Spark.
Por padrão, o pushdown está ativado.
Para desabilitar o pushdown em uma sessão do Spark para um determinado DataFrame:
Após instanciar um objeto
SparkSession
, chame o método estáticoSnowflakeConnectorUtils.disablePushdownSession
, passando o objetoSparkSession
. Por exemplo:sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
Crie um DataFrame com a opção autopushdown definida como
off
. Por exemplo:df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", query) \ .option("autopushdown", "off") \ .load()
Observe que você também pode definir a opção
autopushdown
em umDictionary
que você passa para o métodooptions
(por exemplo, emsfOptions
no exemplo acima).
Para habilitar o pushdown novamente após desativá-lo, chame o método estático SnowflakeConnectorUtils.enablePushdownSession
(passando o objeto SparkSession
) e crie um DataFrame com autopushdown
habilitado.
Script Python de exemplo¶
Importante
Este script de exemplo presume que você está usando a versão 2.2.0 (ou superior) do conector, que usa um estágio interno do Snowflake para armazenar dados temporários e, portanto, não requer um local S3 para armazenar estes dados. Se você estiver usando uma versão anterior, é preciso ter um local S3 e incluir valores para tempdir
, awsAccessKey
, awsSecretKey
para sfOptions
. Para obter mais detalhes, consulte Opções do AWS para transferência de dados externa (neste tópico).
Uma vez configurado o script pyspark
, você pode realizar consultas SQL e outras operações. Aqui está um exemplo de script Python que realiza uma consulta SQL simples. Este script ilustra o uso básico de conectores. A maioria dos exemplos do Scala neste documento pode ser adaptada com um mínimo de esforço/alterações para uso com Python.
O script Python de exemplo usa autenticação básica (ou seja, nome de usuário e senha). Se quiser autenticar com OAuth, consulte Uso de OAuth externo (neste tópico).
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')
# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")
# Set options below
sfOptions = {
"sfURL" : "<account_identifier>.snowflakecomputing.com",
"sfUser" : "<user_name>",
"sfPassword" : "<password>",
"sfDatabase" : "<database>",
"sfSchema" : "<schema>",
"sfWarehouse" : "<warehouse>"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "select 1 as my_num union all select 2 as my_num") \
.load()
df.show()
Dica
Observe o uso de sfOptions
e SNOWFLAKE_SOURCE_NAME
. Isto simplifica o código e reduz a chance de erros.
Para detalhes sobre as opções aceitas para sfOptions
, consulte Definição das opções de configuração para o conector (neste tópico).
Mapeamentos de tipos de dados¶
O conector do Spark é compatível com a conversão entre muitos tipos de dados comuns.
Do SQL do Spark ao Snowflake¶
Tipo de dados do Spark
Tipo de dados do Snowflake
ArrayType
VARIANT
BinaryType
Sem suporte
BooleanType
BOOLEAN
ByteType
INTEGER. O Snowflake não tem suporte para o tipo BYTE.
DateType
DATE
DecimalType
DECIMAL
DoubleType
DOUBLE
FloatType
FLOAT
IntegerType
INTEGER
LongType
INTEGER
MapType
VARIANT
ShortType
INTEGER
StringType
Se o comprimento for especificado, VARCHAR(N); caso contrário, VARCHAR
StructType
VARIANT
TimestampType
TIMESTAMP
Do Snowflake para o SQL do Spark¶
Tipo de dados do Snowflake
Tipo de dados do Spark
ARRAY
StringType
BIGINT
DecimalType(38, 0)
BINARY
Sem suporte
BLOB
Sem suporte
BOOLEAN
BooleanType
CHAR
StringType
CLOB
StringType
DATE
DateType
DECIMAL
DecimalType
DOUBLE
DoubleType
FLOAT
DoubleType
INTEGER
DecimalType(38, 0)
OBJECT
StringType
TIMESTAMP
TimestampType
TIME
StringType
(Versão do conector do Spark 2.4.14 ou posterior)VARIANT
StringType
Chamada do método DataFrame.show¶
Se você estiver chamando o método DataFrame.show
e passando em um número menor que o número de linhas no DataFrame, construa um DataFrame que contenha apenas as linhas a serem mostradas de forma ordenada.
Para fazer isso:
Chame primeiro o método
sort
para retornar um DataFrame que contenha linhas ordenadas.Chame o método
limit
naquele DataFrame para retornar um DataFrame que contenha apenas as linhas que você deseja mostrar.Chame o método
show
no DataFrame retornado.
Por exemplo, se você quiser mostrar 5 linhas e ter os resultados ordenados pela coluna my_col
:
val dfWithRowsToShow = originalDf.sort("my_col").limit(5) dfWithRowsToShow.show(5)
Caso contrário, se você chamar show
para exibir um subconjunto de linhas no DataFrame, diferentes execuções do código podem resultar na exibição de diferentes linhas.
Definição das opções de configuração para o conector¶
As seções seguintes listam as opções que você define para configurar o comportamento do conector:
Para definir estas opções, chame o método .option(<valor>, <de chave>)
ou .options(<mapa>)
da classe Spark DataframeReader.
Dica
Para facilitar o uso das opções, Snowflake recomenda especificar as opções em um único objeto Map
e chamar .options(<mapa>)
para definir as opções.
Opções de conexão necessárias¶
As seguintes opções são necessárias para a conexão com o Snowflake:
sfUrl
Especifica o nome de host de sua conta no seguinte formato:
account_identifier.snowflakecomputing.com
account_identifier
é seu identificador da conta.sfUser
Nome de login para o usuário do Snowflake.
Observe que você deve usar uma das seguintes opções para autenticar:
sfPassword
Senha para o usuário do Snowflake.
pem_private_key
Chave privada (em formato PEM) para autenticação do par de chaves. Para obter instruções, consulte Autenticação de pares de chaves e rotação de pares de chaves.
sfAuthenticator
Especifica o uso de OAuth externo para autenticar no Snowflake. Defina o valor como
oauth
.A utilização do parâmetro OAuth externo requer a definição do parâmetro
sfToken
.
sfToken
(obrigatório se estiver usando OAuth externo) Definir o valor para o token de acesso do OAuth externo.
Este parâmetro de conexão requer o definir o valor do parâmetro
sfAuthenticator
comooauth
.O padrão é nenhum.
Opções de contexto necessárias¶
As seguintes opções são necessárias para definir o contexto de banco de dados e esquema para a sessão:
sfDatabase
O banco de dados a ser utilizado para a sessão após a conexão.
sfSchema
O esquema a ser usado para a sessão após a conexão.
Opções adicionais de contexto¶
As opções listadas nesta seção não são necessárias.
sfAccount
Identificador da conta (por exemplo,
myorganization-myaccount
). Esta opção não é mais necessária porque o identificador da conta é especificado emsfUrl
. Está documentada aqui apenas para compatibilidade retroativa.sfWarehouse
O warehouse virtual padrão a ser utilizado para a sessão após a conexão.
sfRole
A função de segurança padrão a ser usada para a sessão após a conexão.
Opções de proxy¶
As opções listadas nesta seção não são necessárias.
use_proxy
Especifica se o conector deve usar um proxy:
true
especifica que o conector deve usar um proxy.false
especifica que o conector não deve usar um proxy.
O valor padrão é
false
.proxy_host
(Obrigatório se
use_proxy
fortrue
) Especifica o nome de host do servidor proxy a ser utilizado.proxy_port
(Obrigatório se
use_proxy
fortrue
) Especifica o número da porta do servidor proxy a ser utilizada.proxy_protocol
Especifica o protocolo usado para se conectar ao servidor proxy. Especifique um dos seguintes valores:
http
https
O valor padrão é
http
.Isto só é suportado para Snowflake em AWS.
Esta opção foi adicionada na versão 2.11.1 do Spark Connector.
proxy_user
Especifica o nome do usuário para autenticação no servidor proxy. Defina isto se o servidor proxy exigir autenticação.
Isto só é suportado para Snowflake em AWS.
proxy_password
Especifica a senha de
proxy_user
para autenticação no servidor proxy. Defina isto se o servidor proxy exigir autenticação.Isto só é suportado para Snowflake em AWS.
non_proxy_hosts
Especifica a lista de hosts aos quais o conector deve se conectar diretamente, ignorando o servidor proxy.
Separe os nomes dos hosts com um símbolo de barra vertical com um caractere de escape na URL(
%7C
). Você também pode usar um asterisco (*
) como curinga.Isto só é suportado para Snowflake em AWS.
Opções adicionais¶
As opções listadas nesta seção não são necessárias.
sfTimezone
O fuso horário a ser utilizado pelo Snowflake ao trabalhar com o Spark. Observe que o parâmetro só define o fuso horário no Snowflake; o ambiente do Spark permanece inalterado. Os valores aceitos são:
spark
: use o fuso horário do Spark (padrão).snowflake
: use o fuso horário atual para o Snowflake.sf_default
: use o fuso horário padrão para o usuário do Snowflake que está se conectando.time_zone
: use um fuso horário específico (por exemplo,America/New_York
), se válido.Para obter mais informações sobre o impacto da definição desta opção, consulte Como trabalhar com carimbos de data/hora e fusos horários (neste tópico).
sfCompress
Se definido como
on
(padrão), os dados passados entre o Snowflake e o Spark são compactados.s3MaxFileSize
O tamanho de arquivo usado ao mover dados do Snowflake para o Spark. O padrão é 10MB.
preactions
Uma lista separada por ponto e vírgula de comandos SQL que são executados antes que os dados sejam transferidos entre o Spark e o Snowflake.
Se um comando SQL contém
%s
, o%s
é substituído pelo nome da tabela referenciada para a operação.postactions
Uma lista separada por ponto e vírgula de comandos SQL que são executados depois que os dados são transferidos entre o Spark e o Snowflake.
Se um comando SQL contém
%s
, ele é substituído pelo nome da tabela referenciada para a operação.truncate_columns
Se definido como
on
(padrão), um comando COPY trunca automaticamente as cadeias de texto que excedem o comprimento da coluna de destino. Se definido comooff
, o comando produz um erro se uma cadeia de caracteres carregada exceder o comprimento da coluna de destino.truncate_table
Este parâmetro controla se o Snowflake retém o esquema de uma tabela de destino do Snowflake ao sobrescrever essa tabela.
Por padrão, quando uma tabela de destino no Snowflake é sobregravada, o esquema dessa tabela de destino também é sobregravado; o novo esquema é baseado no esquema da tabela de origem (o dataframe do Spark).
No entanto, às vezes o esquema da origem não é o ideal. Por exemplo, um usuário pode querer que uma tabela de destino do Snowflake seja capaz de armazenar valores FLOAT no futuro, mesmo que o tipo de dados da coluna inicial da fonte seja INTEGER. Nesse caso, o esquema da tabela do Snowflake não deve ser sobregravado; a tabela do Snowflake deve apenas ser truncada e depois reaproveitada com seu esquema atual.
Os valores possíveis deste parâmetro são:
on
off
Se este parâmetro for
on
, o esquema original da tabela de destino será mantido. Se este parâmetro foroff
, então o antigo esquema da tabela será ignorado e um novo esquema será gerado com base no esquema da origem.Este parâmetro é opcional.
O valor padrão deste parâmetro é
off
(ou seja, por padrão o esquema original da tabela é sobregravado).Para detalhes sobre o mapeamento dos tipos de dados do Spark para os tipos de dados do Snowflake (e vice-versa), consulte: Mapeamentos de tipos de dados (neste tópico).
continue_on_error
Esta variável controla se o comando COPY é anulado se o usuário inserir dados inválidos (por exemplo, um formato JSON inválido para uma coluna de tipo de dados variante).
Os valores possíveis são:
on
off
O valor
on
significa continuar mesmo que ocorra um erro. O valoroff
significa cancelar se ocorrer um erro.Este parâmetro é opcional.
O valor padrão deste parâmetro é
off
.Não é recomendável ativar esta opção. Se qualquer erro for relatado enquanto COPYing no Snowflake com o conector do Spark, é provável que isso resulte em dados ausentes.
Nota
Se linhas forem rejeitadas ou faltarem, e essas linhas não estiverem claramente defeituosas na origem de entrada, informe a Snowflake.
usestagingtable
Este parâmetro controla se o carregamento de dados utiliza uma tabela de preparação.
Uma tabela de preparação é uma tabela normal (com um nome temporário) que é criada pelo conector; se a operação de carregamento de dados for bem-sucedida, a tabela de destino original é descartada e a tabela de preparação é renomeada com o nome da tabela de destino original. Se a operação de carregamento de dados falhar, a tabela de preparação é descartada e a tabela de destino permanece com os dados que tinha imediatamente antes da operação. Assim, a tabela de preparação permite que os dados originais da tabela de destino sejam mantidos se a operação falhar. Por segurança, a Snowflake recomenda fortemente o uso de uma tabela de preparação na maioria das circunstâncias.
Para que o conector possa criar uma tabela de preparação, o usuário que executa COPY através do conector do Spark deve ter privilégios suficientes para criar uma tabela. O carregamento direto (ou seja, o carregamento sem utilizar uma tabela de preparação) é útil se o usuário não tiver permissão para criar uma tabela.
Os valores possíveis deste parâmetro são:
on
off
Se o parâmetro for
on
, é utilizada uma tabela de preparação. Se este parâmetro foroff
, então os dados são carregados diretamente na tabela de destino.Este parâmetro é opcional.
O valor padrão deste parâmetro é
on
(ou seja, use uma tabela de preparação).
autopushdown
Este parâmetro controla se o pushdown automático de consultas está habilitado.
Se o pushdown estiver habilitado, quando uma consulta for executada no Spark e se parte da consulta puder ser enviada para o servidor Snowflake, ela será enviada. Isto melhora o desempenho de algumas consultas.
Este parâmetro é opcional.
O valor padrão é
on
se o conector estiver conectado a uma versão compatível do Spark. Caso contrário, o valor padrão éoff
.Se o conector estiver vinculado a uma versão diferente do Spark (por exemplo, se a versão 3.2 do conector estiver vinculada à versão 3.3 do Spark), então o pushdown automático é desabilitado mesmo se este parâmetro estiver definido como
on
.purge
Se estiver definido como
on
, o conector exclui arquivos temporários criados durante a transferência do Spark para o Snowflake via transferência externa de dados. Se este parâmetro estiver definido comooff
, esses arquivos não serão automaticamente excluídos pelo conector.A limpeza funciona somente para transferências do Spark para o Snowflake, e não para transferências do Snowflake para o Spark.
Os valores possíveis são
on
off
O valor padrão é
off
.columnmap
Este parâmetro é útil ao gravar dados do Spark no Snowflake quando os nomes das colunas na tabela do Snowflake não correspondem aos nomes das colunas na tabela do Spark. Você pode criar um mapa que indica qual coluna de origem do Spark corresponde a cada coluna de destino do Snowflake.
O parâmetro é um literal de cadeia de caracteres única, na forma de:
"Map(col_2 -> col_b, col_3 -> col_a)"
Por exemplo, considere o seguinte cenário:
Um dataframe chamado
df
no Spark tem três colunas:col_1
,col_2
,col_3
Uma tabela chamada
tb
no Snowflake tem duas colunas:col_a
,col_b
Você quer copiar os seguintes valores:
De
df.col_2
paratb.col_b
.De
df.col_3
paratb.col_a
.
O valor do parâmetro
columnmap
seria:Map(col_2 -> col_b, col_3 -> col_a)
Você pode gerar este valor executando o seguinte código do Scala:
Map("col_2"->"col_b","col_3"->"col_a").toString()
O valor padrão deste parâmetro é nulo. Em outras palavras, por padrão, os nomes das colunas nas tabelas de origem e destino devem corresponder.
Este parâmetro é usado somente quando se grava do Spark no Snowflake; ele não se aplica quando se grava do Snowflake no Spark.
keep_column_case
Ao gravar uma tabela do Spark no Snowflake, o conector do Spark, por padrão, muda as letras nos nomes das colunas para maiúsculas, a menos que os nomes das colunas estejam entre aspas duplas.
Ao gravar uma tabela do Snowflake no Spark, o conector do Spark, por padrão, adiciona aspas duplas em torno de qualquer nome de coluna que contenha quaisquer caracteres exceto letras maiúsculas, sublinhados e dígitos.
Se você definir keep_column_case como
on
, o conector do Spark não fará estas mudanças.Os valores possíveis são:
on
off
O valor padrão é
off
.column_mapping
O conector deve mapear colunas do dataframe do Spark para a tabela do Snowflake. Isso pode ser feito com base nos nomes das colunas (independentemente da ordem), ou com base na ordem das colunas (ou seja, a primeira coluna do dataframe é mapeada para a primeira coluna na tabela, independentemente do nome da coluna).
Por padrão, o mapeamento é feito com base na ordem. Você pode substituir isso definindo este parâmetro como
name
, o que diz ao conector para mapear colunas com base nos nomes das colunas. (O mapeamento de nomes não faz distinção entre maiúsculas e minúsculas).Os valores possíveis deste parâmetro são:
order
name
O valor padrão é
order
.column_mismatch_behavior
Este parâmetro se aplica somente quando o parâmetro
column_mapping
é definido comoname
.Se os nomes das colunas no dataframe do Spark e na tabela do Snowflake não corresponderem, então:
Se
column_mismatch_behavior
forerror
, o conector do Spark reporta um erro.Se
column_mismatch_behavior
forignore
, o conector do Spark ignora o erro.O driver descarta qualquer coluna no dataframe do Spark que não tenha uma coluna correspondente na tabela do Snowflake.
O driver insere NULLs em qualquer coluna da tabela do Snowflake que não tenha uma coluna correspondente no dataframe do Spark.
Os erros em potencial incluem:
O dataframe do Spark pode conter colunas que são idênticas, exceto quanto a letras maiúsculas/minúsculas. Como o mapeamento de nomes de colunas não diferencia maiúsculas de minúsculas, não é possível determinar o mapeamento correto desde o dataframe até a tabela.
A tabela do Snowflake pode conter colunas que são idênticas, exceto quanto a letras maiúsculas/minúsculas. Como o mapeamento de nomes de colunas não diferencia maiúsculas de minúsculas, não é possível determinar o mapeamento correto desde o dataframe até a tabela.
O dataframe do Spark e a tabela do Snowflake podem não ter nomes de colunas em comum. Em teoria, o conector do Spark poderia inserir NULLs em cada coluna de cada linha, mas isso geralmente é inútil, então o conector lança um erro mesmo que o
column_mismatch_behavior
esteja definido comoignore
.
Os valores possíveis deste parâmetro são:
error
ignore
O valor padrão é
error
.time_output_format
Este parâmetro permite que o usuário especifique o formato dos dados
TIME
retornados.Os valores possíveis deste parâmetro são os valores possíveis para os formatos de hora especificados em Formatos de hora.
Este parâmetro afeta apenas a saída, não a entrada.
timestamp_ntz_output_format
, .timestamp_ltz_output_format
, .timestamp_tz_output_format
Estas opções especificam o formato de saída para os valores de carimbo de data/hora. Os valores padrão destas opções são:
Opção de configuração
Valor padrão
timestamp_ntz_output_format
"YYYY-MM-DD HH24:MI:SS.FF3"
timestamp_ltz_output_format
"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"
timestamp_tz_output_format
"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"
Se estas opções estiverem definidas em
"sf_current"
, o conector utiliza os formatos especificados para a sessão.partition_size_in_mb
Este parâmetro é usado quando o conjunto de resultados da consulta é muito grande e precisa ser dividido em múltiplas partições DataFrame. Este parâmetro especifica o tamanho não compactado recomendado para cada partição DataFrame. Para reduzir o número de partições, aumente este tamanho.
Este tamanho é usado como o valor recomendado; o tamanho real das partições pode ser menor ou maior.
Esta opção se aplica somente quando o parâmetro use_copy_unload é FALSE.
Este parâmetro é opcional.
O valor padrão é
100
(MB).
use_copy_unload
Se esse for
FALSE
, o Snowflake usa o formato de dados Arrow quando SELECTing dados. Se definido comoTRUE
, então o Snowflake reverte para o comportamento antigo de usar o comandoCOPY UNLOAD
para transmitir dados selecionados.Este parâmetro é opcional.
O valor padrão é
FALSE
.treat_decimal_as_long
Se
TRUE
, configura o Spark Connector para retornar valoresLong
(em vez de valoresBigDecimal
) para consultas que retornam o tipoDecimal(precision, 0)
.O valor padrão é
FALSE
.Esta opção foi adicionada na versão 2.11.1 do Spark Connector.
s3_stage_vpce_dns_name
Especifica o nome de DNS de seu ponto de extremidade VPC para o acesso aos estágios internos.
Esta opção foi adicionada na versão 2.11.1 do Spark Connector.
support_share_connection
Se
FALSE
, configura o conector Spark para criar uma nova conexão JDBC para cada trabalho ou ação que usa as mesmas opções do conector Spark para acessar o Snowflake.O valor padrão é
TRUE
, o que significa que os diferentes trabalhos e ações compartilham a mesma conexão JDBC se eles usarem as mesmas opções do conector Spark para acessar o Snowflake.Se você precisar habilitar ou desabilitar essa configuração programaticamente, use as seguintes funções estáticas globais:
SparkConnectorContext.disableSharedConnection()
SparkConnectorContext.enableSharingJDBCConnection()
Nota
Nos seguintes casos especiais, o conector Spark não usa uma conexão JDBC compartilhada:
Se pré-ações ou pós-ações forem definidas e essas pré-ações ou pós-ações não forem CREATE TABLE, DROP TABLE ou MERGE INTO, o conector Spark não usa a conexão compartilhada.
Funções de utilitário em Utils, como
Utils.runQuery()
eUtils.getJDBCConnection()
não usam a conexão compartilhada.
Esta opção foi adicionada na versão 2.11.2 do conector Spark.
force_skip_pre_post_action_check_for_shared_session
Se
TRUE
, configura o conector Spark para desabilitar a validação de pré-ações e pós-ações para compartilhamento de sessão.O valor padrão é
FALSE
.Importante
Antes de definir esta opção, verifique se as consultas em pré-ações e pós-ações não afetam as configurações da sessão. Caso contrário, você pode encontrar problemas com os resultados.
Esta opção foi adicionada na versão 2.11.3 do conector Spark.
Uso de autenticação e rodízio do par de chaves¶
O conector do Spark é compatível com a autenticação de par de chaves e a rotação de chaves.
Para começar, complete a configuração inicial para autenticação de par de chaves como mostrado em Autenticação de pares de chaves e rotação de pares de chaves.
Envie uma cópia não criptografada da chave privada usando a opção de conexão
pem_private_key
.
Atenção
Por razões de segurança, em vez de codificar o pem_private_key
em seu aplicativo, você deve definir o parâmetro dinamicamente após ler a chave a partir de uma fonte segura. Se a chave estiver criptografada, então descriptografe-a e envie a versão descriptografada.
No exemplo com Python, observe que o arquivo pem_private_key
, rsa_key.p8
, está:
Sendo lido diretamente de um arquivo protegido por senha, usando a variável de ambiente
PRIVATE_KEY_PASSPHRASE
.Usando a expressão
pkb
na cadeia de caracteressfOptions
.
Para conectar, você pode salvar o exemplo Python em um arquivo (ou seja, <file.py>
) e então executar o seguinte comando:
spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>
Python
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os
with open("<path>/rsa_key.p8", "rb") as key_file:
p_key = serialization.load_pem_private_key(
key_file.read(),
password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend = default_backend()
)
pkb = p_key.private_bytes(
encoding = serialization.Encoding.PEM,
format = serialization.PrivateFormat.PKCS8,
encryption_algorithm = serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")
sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')
sfOptions = {
"sfURL" : "<account_identifier>.snowflakecomputing.com",
"sfUser" : "<user_name>",
"pem_private_key" : pkb,
"sfDatabase" : "<database>",
"sfSchema" : "schema",
"sfWarehouse" : "<warehouse>"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "COLORS") \
.load()
df.show()
Uso de OAuth externo¶
Começando pela versão do conector do Spark 2.7.0, você pode usar OAuth externo para autenticação no Snowflake usando o programa Scala ou o script Python de exemplo.
Antes de usar o OAuth externo e o conector do Spark para autenticação no Snowflake, configure uma integração de segurança de OAuth externo para um dos servidores de autorização de OAuth externo compatíveis ou para um cliente personalizado de OAuth externo.
Nos exemplos de Scala e Python, observe a substituição do parâmetro sfPassword
pelos parâmetros sfAuthenticator
e sfToken
.
Scala:
// spark connector version val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME import org.apache.spark.sql.DataFrame var sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<username>", "sfAuthenticator" -> "oauth", "sfToken" -> "<external_oauth_access_token>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) // // Create a DataFrame from a Snowflake table // val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "region") .load() // // Join, augment, aggregate, etc. the data in Spark and then use the // Data Source API to write the data back to a table in Snowflake // df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t2") .mode(SaveMode.Overwrite) .save()
Python:
from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * sc = SparkContext("local", "Simple App") spark = SQLContext(sc) spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>') # You might need to set these sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>") sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>") # Set options below sfOptions = { "sfURL" : "<account_identifier>.snowflakecomputing.com", "sfUser" : "<user_name>", "sfAuthenticator" : "oauth", "sfToken" : "<external_oauth_access_token>", "sfDatabase" : "<database>", "sfSchema" : "<schema>", "sfWarehouse" : "<warehouse>" } SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", "select 1 as my_num union all select 2 as my_num") \ .load() df.show()
Opções do AWS para transferência de dados externa¶
Estas opções são usadas para especificar o local do Amazon S3 onde os dados temporários são armazenados e fornecer detalhes de autenticação para acessar o local. Elas são necessárias somente se você estiver fazendo uma transferência de dados externa. As transferências de dados externas são necessárias se uma das seguintes opções for verdadeira:
Você está usando a versão 2.1.x ou anterior do conector do Spark (que não aceita transferências internas), ou
Sua transferência provavelmente levará 36 horas ou mais (as transferências internas utilizam credenciais temporárias que expiram após 36 horas).
tempDir
O local S3 onde os dados intermediários são armazenados (por exemplo,
s3n://xy12345-bucket/spark-snowflake-tmp/
).Se
tempDir
for especificado, você também deve especificar uma destas opções:awsAccessKey
,awsSecretKey
. outemporary_aws_access_key_id
,temporary_aws_secret_access_key
,temporary_aws_session_token
awsAccessKey
,awsSecretKey
Estas são credenciais AWS padrão que permitem o acesso ao local especificado em
tempDir
. Observe que ambas as opções devem ser definidas em conjunto.Se forem definidas, podem ser recuperadas do objeto
SparkContext
existente.Se você especificar estas variáveis, também deverá especificar
tempDir
.Estas credenciais também devem ser definidas para o cluster Hadoop.
temporary_aws_access_key_id
,temporary_aws_secret_access_key
,temporary_aws_session_token
Elas são credenciais AWS temporárias que permitem o acesso ao local especificado em
tempDir
. Observe que todas essas três opções devem ser definidas em conjunto.Além disso, se essas opções forem definidas, terão precedência sobre as opções
awsAccessKey
eawsSecretKey
.Se você especificar
temporary_aws_access_key_id
,temporary_aws_secret_access_key
etemporary_aws_session_token
, também deverá especificartempDir
. Caso contrário, estes parâmetros são ignorados.check_bucket_configuration
Se definido como
on
(padrão), o conector verifica se o bucket usado para transferência de dados tem uma política de ciclo de vida configurada (consulte Preparação de um bucket AWS S3 externo para mais informações). Se não houver uma política de ciclo de vida presente, um aviso é registrado.Desabilitar essa opção (definindo-a como
off
) ignora essa verificação. Isto pode ser útil se um usuário puder acessar as operações de dados do bucket, mas não as políticas de ciclo de vida. Desabilitar a opção também pode acelerar ligeiramente os tempos de execução da consulta.
Para obter mais detalhes, consulte Autenticação do S3 para troca de dados (neste tópico).
Opções do Azure para transferência de dados externa¶
Esta seção descreve os parâmetros que se aplicam ao armazenamento de blobs do Azure ao fazer transferências de dados externas. As transferências de dados externas são necessárias se uma das seguintes opções for verdadeira:
Você está usando a versão 2.1.x ou anterior do conector do Spark (que não aceita transferências internas), ou
Sua transferência provavelmente levará 36 horas ou mais (as transferências internas utilizam credenciais temporárias que expiram após 36 horas).
Ao utilizar uma transferência externa com armazenamento de blobs do Azure, você especifica a localização do contêiner do Azure e a SAS (assinatura de acesso compartilhado) para esse contêiner usando os parâmetros descritos abaixo.
tempDir
O contêiner de armazenamento de blobs do Azure onde são armazenados os dados intermediários. Ele tem a forma de um URL, por exemplo:
wasb://<contêiner_do_azure>@<conta_do_azure>.<ponto_de_extremidade_do_azure>/
temporary_azure_sas_token
Especifique o token SAS para o armazenamento de blobs do Azure.
Para obter mais detalhes, consulte Autenticação do Azure para troca de dados (neste tópico).
Especificação de informações do Azure para armazenamento temporário no Spark¶
Ao utilizar o armazenamento de blobs do Azure para fornecer armazenamento temporário para transferência de dados entre o Spark e o Snowflake, você deve fornecer ao Spark, assim como ao conector Spark do Snowflake, o local e as credenciais para o armazenamento temporário.
Para fornecer ao Spark o local de armazenamento temporário, execute comandos semelhantes aos seguintes em seu cluster do Spark:
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Observe que o último comando contém as seguintes variáveis:
<contêiner>
e<conta>
: são os nomes do contêiner e da conta para sua implantação do Azure.<ponto_de_extremidade_do_azure>
: o ponto de extremidade de seu local de implantação do Azure. Por exemplo, se você estiver usando uma implantação US do Azure, é provável que o ponto de extremidade sejablob.core.windows.net
.<azure_sas>
: o token de segurança de Assinatura de Acesso Compartilhado.
Substitua cada uma dessas variáveis pelas informações adequadas para sua conta de armazenamento de blobs do Azure.
Passagem de parâmetros da sessão do Snowflake como opções para o conector¶
O conector do Snowflake para Spark aceita o envio de parâmetros arbitrários em nível de sessão ao Snowflake (consulte Parâmetros de sessão para mais informações). Isso pode ser feito adicionando-se um par ("<chave>" -> "<valor>")
ao objeto options
, onde <chave>
é o nome do parâmetro da sessão e <valor>
é o valor.
Nota
O <valor>
deve ser uma cadeia de caracteres entre aspas duplas, mesmo para parâmetros que aceitam números ou valores booleanos (por exemplo, "1"
ou "true"
).
Por exemplo, o código de exemplo a seguir passa o parâmetro de sessão USE_CACHED_RESULT com um valor de "false"
, que desabilita o uso dos resultados de consultas executadas anteriormente:
// ... assuming sfOptions contains Snowflake connector options
// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")
// ... now use sfOptions with the .options() method
Considerações de segurança¶
Os clientes devem garantir que em um sistema Spark com múltiplos nós as comunicações entre os nós sejam seguras. O mestre do Spark envia credenciais do Snowflake aos nós de trabalho do Spark para que eles possam acessar os estágios do Snowflake. Se as comunicações entre o mestre e os nós de trabalho do Spark não forem seguras, as credenciais poderão ser lidas por um terceiro não autorizado.
Autenticação do S3 para troca de dados¶
Esta seção descreve como autenticar ao usar o S3 para troca de dados.
Esta tarefa é necessária somente em uma das seguintes circunstâncias:
A versão do conector do Snowflake para Spark é 2.1.x (ou anterior). A partir da versão v2.2.0, o conector utiliza um estágio temporário interno do Snowflake para a troca de dados. Se você não estiver usando a versão 2.2.0 (ou superior) do conector, a Snowflake recomenda fortemente a atualização para a versão mais recente.
A versão do conector do Snowflake para Spark é 2.2.0 (ou superior), mas seus trabalhos costumam exceder 36 horas. Esta é a duração máxima do token AWS utilizado pelo conector para acessar o estágio interno para troca de dados.
Se estiver usando uma versão mais antiga do conector, você precisa preparar um local S3 que o conector possa usar para trocar dados entre o Snowflake e o Spark.
Para permitir o acesso ao bucket/diretório S3 usado para trocar dados entre o Spark e o Snowflake (como especificado para tempDir
), dois métodos de autenticação são aceitos:
Credenciais permanentes do AWS (também usadas para configurar a autenticação Hadoop/Spark para acessar o S3)
Credenciais temporárias do AWS
Uso de credenciais permanentes do AWS¶
Este é o método de autenticação padrão do AWS. Ele requer um par de valores awsAccessKey
e awsSecretKey
.
Nota
Esses valores também precisam ser usados para configurar o Hadoop/Spark para acessar o S3. Para obter mais informações, incluindo exemplos, consulte Autenticação no Hadoop/Spark usando S3A ou S3N (neste tópico).
Por exemplo:
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>") // Then, configure your Snowflake environment // var sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<user_name>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>", "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"), "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"), "tempdir" -> "s3n://<temp-bucket-name>" )
Para detalhes sobre as opções aceitas por sfOptions
, consulte Opções do AWS para transferência de dados externa (neste tópico).
Autenticação no Hadoop/Spark usando S3A ou S3N¶
Ecossistemas Hadoop/Spark aceitam 2 esquemas URI para acesso ao S3:
s3a://
Novo método recomendado (para Hadoop 2.7 e superior)
Para usar este método, modifique os exemplos do Scala neste tópico para adicionar as seguintes opções de configuração do Hadoop:
val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3a.access.key", <accessKey>) hadoopConf.set("fs.s3a.secret.key", <secretKey>)
Certifique-se de que a opção
tempdir
utilize tambéms3a://
.s3n://
Método mais antigo (para Hadoop 2.6 e anterior)
Em alguns sistemas, é necessário especificá-lo explicitamente como mostrado no exemplo do Scala a seguir:
val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>) hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)
Uso de credenciais temporárias do AWS¶
Este método usa as opções de configuração temporary_aws_access_key_id
, temporary_aws_secret_access_key
e temporary_aws_session_token
para o conector.
Este método oferece segurança adicional fornecendo ao Snowflake apenas acesso temporário ao bucket/diretório S3 utilizado para troca de dados.
Nota
As credenciais temporárias só podem ser usadas para configurar a autenticação S3 para o conector; elas não podem ser usadas para configurar a autenticação Hadoop/Spark.
Além disso, se você fornecer credenciais temporárias, elas têm precedência sobre quaisquer credenciais permanentes que tenham sido fornecidas.
O código do Scala a seguir fornece um exemplo de autenticação usando credenciais temporárias:
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest import net.snowflake.spark.snowflake.Parameters // ... val sts_client = new AWSSecurityTokenServiceClient() val session_token_request = new GetSessionTokenRequest() // Set the token duration to 2 hours. session_token_request.setDurationSeconds(7200) val session_token_result = sts_client.getSessionToken(session_token_request) val session_creds = session_token_result.getCredentials() // Create a new set of Snowflake connector options, based on the existing // sfOptions definition, with additional temporary credential options that override // the credential options in sfOptions. // Note that constants from Parameters are used to guarantee correct // key names, but literal values, such as temporary_aws_access_key_id are, of course, // also allowed. var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId()) sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey()) sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())
sfOptions2
pode agora ser usado com o método de DataFrame options()
.
Autenticação do Azure para troca de dados¶
Esta seção descreve como autenticar ao utilizar o armazenamento de blobs do Azure para troca de dados.
A autenticação desta forma é necessária somente em uma das seguintes circunstâncias:
A versão do conector do Snowflake para Spark é 2.1.x (ou anterior). A partir da versão v2.2.0, o conector utiliza um estágio temporário interno do Snowflake para a troca de dados. Se você não estiver usando a versão 2.2.0 (ou superior) do conector, a Snowflake recomenda fortemente a atualização para a versão mais recente.
A versão do conector do Snowflake para Spark é 2.2.0 (ou superior), mas seus trabalhos costumam exceder 36 horas. Esta é a duração máxima do token Azure utilizado pelo conector para acessar o estágio interno de troca de dados.
Você precisa preparar um contêiner de armazenamento de blobs do Azure que o conector possa usar para trocar dados entre o Snowflake e o Spark.
Uso de credenciais do Azure¶
Este é o método de autenticação padrão do armazenamento de blobs do Azure. Ele requer um par de valores: tempDir
(um URL) e valores temporary_azure_sas_token
.
Nota
Estes valores também precisam ser usados para configurar o Hadoop/Spark para acessar o armazenamento de blobs do Azure. Para obter mais informações, incluindo exemplos, consulte Autenticação no Hadoop/Spark usando Azure (neste tópico).
Por exemplo:
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>) // Then, configure your Snowflake environment // val sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<user_name>", "sfPassword" -> "<password>", "sfDatabase" -> "<database_name>", "sfSchema" -> "<schema_name>", "sfWarehouse" -> "<warehouse_name>", "sfCompress" -> "on", "sfSSL" -> "on", "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/", "temporary_azure_sas_token" -> "<azure_sas>" )
Para detalhes sobre as opções aceitas por sfOptions
, consulte Opções do Azure para transferência de dados externa (neste tópico).
Autenticação no Hadoop/Spark usando Azure¶
Para usar este método, modifique os exemplos do Scala neste tópico para adicionar as seguintes opções de configuração do Hadoop:
val hadoopConf = sc.hadoopConfiguration sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Certifique-se de que a opção tempdir
utilize também wasb://
.
A autenticação através de um navegador não é aceita¶
Ao usar o conector do Spark, é impraticável usar qualquer forma de autenticação que abra uma janela do navegador para pedir credenciais ao usuário. A janela não apareceria necessariamente na máquina do cliente. Portanto, o conector do Spark não aceita nenhum tipo de autenticação, incluindo MFA (autenticação multifator) ou SSO (login único), que chame uma janela do navegador.