Uso do conector do Spark¶
O conector é compatível com a API padrão do Spark, mas com a adição de opções específicas do Snowflake, que são descritas neste tópico.
Neste tópico, o termo COPY refere-se a:
COPY INTO <tabela> (usado para transferir dados de um estágio interno ou externo para uma tabela).
COPY INTO <local> (usado para transferir dados de uma tabela para um estágio interno ou externo).
Neste tópico:
Verificação da conexão da rede ao Snowflake com SnowCD¶
Após configurar seu driver, você pode avaliar e solucionar problemas de conectividade de rede com o Snowflake usando o SnowCD.
Você pode usar o SnowCD durante o processo de configuração inicial e sob demanda a qualquer momento para avaliar e solucionar problemas de sua conexão de rede ao Snowflake.
Pushdown¶
O conector do Spark aplica o pushdown de predicado e consulta capturando e analisando os planos lógicos do Spark para operações SQL. Quando a fonte de dados é o Snowflake, as operações são convertidas em uma consulta SQL e depois executadas no Snowflake para melhorar o desempenho.
Entretanto, como essa precisa ser quase uma conversão um-para-um dos operadores SQL do Spark para expressões do Snowflake, nem todos os operadores SQL do Spark podem ser otimizados. Quando o pushdown falha, o conector volta para um plano de execução menos otimizado. As operações sem suporte são realizadas no Spark.
Nota
Se você precisar de pushdown para todas as operações, considere escrever seu código para usar Snowpark API em seu lugar.
Abaixo está uma lista das operações com suporte para pushdown (todas as funções abaixo usam seus nomes Spark). Se uma função não estiver nesta lista, um plano do Spark que a utilize pode ser executado no Spark em vez de ser enviado ao Snowflake.
Funções de agregação
Média
Corr
CovPopulation
CovSample
Contagem
Max
Min
StddevPop
StddevSamp
Soma
VariancePop
VarianceSamp
Operadores booleanos
E
Between
Contains
EndsWith
EqualTo
GreaterThan
GreaterThanOrEqual
In
IsNull
IsNotNull
LessThan
LessThanOrEqual
Not
Ou
StartsWith
Funções de data, hora e carimbo de data/hora
DateAdd
DateSub
Month
Quarter
TruncDate
TruncTimestamp
Ano
Funções matemáticas
Operadores aritméticos “+” (adição), “-” (subtração), “*” (multiplicação), “/” (divisão) e “-” (negação unária).
Abs
Acos
Asin
Atan
Ceil
CheckOverflow
Cos
Cosh
Exp
Floor
Greatest
Least
Log
Pi
Pow
PromotePrecision
Rand
Round
Sin
Sinh
Sqrt
Tan
Tanh
Operadores diversos
Alias (expressões AS)
BitwiseAnd
BitwiseNot
BitwiseOr
BitwiseXor
CaseWhen
Cast(child, t, _)
Coalesce
If
MakeDecimal
ScalarSubquery
ShiftLeft
ShiftRight
SortOrder
UnscaledValue
Operadores relacionais
Funções agregadas e cláusulas group-by
Distinct
Filtros
In
InSet
Junções
Limites
Projeções
Classificações (ORDER BY)
Union e Union All
Funções de janela e cláusulas de windowing
Funções de cadeia de caracteres
Ascii
Concat(children)
Length
Like
Lower
StringLPad
StringRPad
StringTranslate
StringTrim
StringTrimLeft
StringTrimRight
Substring
Upper
Funções de janela (nota: não funcionam com o Spark 2.2)
DenseRank
Rank
RowNumber
Uso do conector no Scala¶
Especificação do nome da classe de fonte de dados¶
Para usar o Snowflake como fonte de dados no Spark, use a opção .format para fornecer o nome da classe do conector do Snowflake que define a fonte de dados.
net.snowflake.spark.snowflake
Para garantir uma verificação no tempo de compilação do nome da classe, a Snowflake recomenda fortemente a definição de uma variável para o nome da classe. Por exemplo:
val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
Além disso, por conveniência, a classe Utils fornece a variável, que pode ser importada da seguinte forma:
import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
Nota
Todos os exemplos neste tópico utilizam SNOWFLAKE_SOURCE_NAME como a definição de classe.
Habilitação/desabilitação do pushdown em uma sessão¶
A versão 2.1.0 (e superior) do conector é compatível com o pushdown de consulta, que pode melhorar significativamente o desempenho enviando o processamento da consulta para o Snowflake quando ele é a fonte de dados do Spark.
Por padrão, o pushdown está ativado.
Para desabilitar o pushdown em uma sessão do Spark para um determinado DataFrame:
Após instanciar um objeto
SparkSession, chame o método estáticoSnowflakeConnectorUtils.disablePushdownSession, passando o objetoSparkSession. Por exemplo:SnowflakeConnectorUtils.disablePushdownSession(spark)
Onde
sparké seu objetoSparkSession.Crie um DataFrame com a opção autopushdown definida como
off. Por exemplo:val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", query) .option("autopushdown", "off") .load()
Observe que você também pode definir a opção
autopushdownem umMapque você passa para o métodooptions(por exemplo, emsfOptionsno exemplo acima).
Para habilitar o pushdown novamente após desativá-lo, chame o método estático SnowflakeConnectorUtils.enablePushdownSession (passando o objeto SparkSession) e crie um DataFrame com autopushdown habilitado.
Movimentação de dados do Snowflake para o Spark¶
Nota
Ao utilizar DataFrames, o conector do Snowflake aceita somente consultas SELECT.
Para ler dados do Snowflake em umDataFrame do Spark:
Use o método
read()do objetoSqlContextpara construir umDataFrameReader.Especifique
SNOWFLAKE_SOURCE_NAMEusando o métodoformat(). Para a definição, consulte Especificação do nome da classe de fonte de dados (neste tópico).Especifique as opções do conector usando o método
option()ouoptions(). Para obter mais informações, consulte Definição das opções de configuração para o conector (neste tópico).Especifique uma das seguintes opções para os dados da tabela a serem lidos:
dbtable: nome da tabela a ser lida. Todas as colunas e registros são recuperados (ou seja, é equivalente aSELECT * FROM db_table).query: consulta exata (instrução SELECT) a ser executada.
Notas de uso¶
Atualmente, o conector não aceita outros tipos de consultas (por exemplo, SHOW ou DESC, ou instruções DML) ao usar DataFrames.
Há um limite máximo para o tamanho de uma linha individual. Para obter mais detalhes, consulte Limites no tamanho do texto de consulta.
Considerações de desempenho¶
Ao transferir dados entre o Snowflake e o Spark, use os seguintes métodos para analisar/melhorar o desempenho:
Use o método
net.snowflake.spark.snowflake.Utils.getLastSelect()para ver a consulta real emitida ao mover os dados do Snowflake para o Spark.Se usar a funcionalidade
filterouwheredo DataFrame do Spark, verifique se os respectivos filtros estão presentes na consulta SQL emitida. O conector do Snowflake tenta converter todos os filtros solicitados pelo Spark para SQL.No entanto, existem formas de filtros que a infraestrutura do Spark hoje não passa para o conector do Snowflake. Como resultado, em algumas situações, um grande número de registros desnecessários é solicitado ao Snowflake.
Se você precisar de apenas um subconjunto de colunas, certifique-se de indicar o subconjunto na consulta SQL.
Em geral, se a consulta SQL emitida não corresponder ao que você espera com base nas operações do DataFrame, use a opção
querypara fornecer a sintaxe SQL exata que deseja.
Exemplos¶
Ler uma tabela inteira:
val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t1") .load()
Ler os resultados de uma consulta:
val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1") .load()
Movimentação de dados do Spark para o Snowflake¶
As etapas para salvar o conteúdo de um DataFrame em uma tabela do Snowflake são semelhantes a gravar do Snowflake no Spark:
Use o método
write()doDataFramepara construir umDataFrameWriter.Especifique
SNOWFLAKE_SOURCE_NAMEusando o métodoformat(). Para a definição, consulte Especificação do nome da classe de fonte de dados (neste tópico).Especifique as opções do conector usando o método
option()ouoptions(). Para obter mais informações, consulte Definição das opções de configuração para o conector (neste tópico).Use a opção
dbtablepara especificar a tabela na qual os dados são gravados.Use o método
mode()para especificar o modo de salvamento do conteúdo.Para obter mais informações, consulte SaveMode (documentação do Spark).
Exemplos¶
df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t2") .mode(SaveMode.Overwrite) .save()
Exportação de JSON do Spark para o Snowflake¶
DataFrames do Spark podem conter objetos JSON, serializados como cadeias de caracteres. O seguinte código fornece um exemplo de conversão de um DataFrame regular em um DataFrame contendo dados JSON:
val rdd = myDataFrame.toJSON val schema = new StructType(Array(StructField("JSON", StringType))) val jsonDataFrame = sqlContext.createDataFrame( rdd.map(s => Row(s)), schema)
Observe que o jsonDataFrame resultante contém uma única coluna do tipo StringType. Como resultado, quando esse DataFrame é exportado para o Snowflake com o modo SaveMode.Overwrite comum, uma nova tabela no Snowflake é criada com uma única coluna do tipo VARCHAR.
Para carregar jsonDataFrame em uma coluna VARIANT:
Crie uma tabela do Snowflake (conectando-se ao Snowflake em Java usando o driver JDBC do Snowflake). Para explicações sobre os parâmetros de conexão usados no exemplo, consulte Referência dos parâmetros de conexão do driver JDBC.
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; public class SnowflakeJDBCExample { public static void main(String[] args) throws Exception { String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.com/"; Properties properties = new Properties(); properties.put("user", "peter"); properties.put("password", "test"); properties.put("account", "myorganization-myaccount"); properties.put("warehouse", "mywh"); properties.put("db", "mydb"); properties.put("schema", "public"); // get connection System.out.println("Create JDBC connection"); Connection connection = DriverManager.getConnection(jdbcUrl, properties); System.out.println("Done creating JDBC connection\n"); // create statement System.out.println("Create JDBC statement"); Statement statement = connection.createStatement(); System.out.println("Done creating JDBC statement\n"); // create a table System.out.println("Create my_variant_table table"); statement.executeUpdate("create or replace table my_variant_table(json VARIANT)"); statement.close(); System.out.println("Done creating demo table\n"); connection.close(); System.out.println("Close connection\n"); } }
Em vez de usar
SaveMode.Overwrite, useSaveMode.Appendpara reutilizar a tabela existente. Quando o valor da cadeia de caracteres que representa JSON é carregado no Snowflake, porque a coluna de destino é do tipo VARIANT, ela é analisada como JSON. Por exemplo:df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "my_variant_table") .mode(SaveMode.Append) .save()
Execução de instruções SQL DDL/DML¶
Use o método runQuery() do objeto Utils para executar instruções SQL DDL/DML, além de consultas, por exemplo:
var sfOptions = Map(
"sfURL" -> "<account_identifier>.snowflakecomputing.com",
"sfUser" -> "<user_name>",
"sfPassword" -> "<password>",
"sfDatabase" -> "<database>",
"sfSchema" -> "<schema>",
"sfWarehouse" -> "<warehouse>"
)
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")
onde sfOptions é o mapa de parâmetros usado para ler/gravar DataFrames.
O método runQuery retorna apenas TRUE ou FALSE. Destina-se a instruções que não retornam um conjunto de resultados; por exemplo instruções DDL como CREATE TABLE e instruções DML como INSERT, UPDATE e DELETE. Não é útil para instruções que retornam um conjunto de resultados, tais como SELECT ou SHOW.
Como trabalhar com carimbos de data/hora e fusos horários¶
O Spark fornece apenas um tipo de carimbo de data/hora, equivalente ao tipo Timestamp do Scala/Java. É quase idêntico em comportamento ao tipo de dados TIMESTAMP_LTZ (fuso horário local) do Snowflake. Como tal, ao transferir dados entre o Spark e o Snowflake, a Snowflake recomenda utilizar as seguintes abordagens para preservar a hora corretamente, em relação aos fusos horários:
Use somente o tipo de dados TIMESTAMP_LTZ no Snowflake.
Nota
O mapeamento padrão do tipo de dados de carimbo de data/hora é TIMESTAMP_NTZ (sem fuso horário), então você precisa definir explicitamente o parâmetro TIMESTAMP_TYPE_MAPPING para usar TIMESTAMP_LTZ.
Defina o fuso horário do Spark como
UTCe use esse fuso horário no Snowflake (ou seja, não defina a opçãosfTimezonepara o conector e não defina explicitamente um fuso horário no Snowflake). Neste cenário, TIMESTAMP_LTZ e TIMESTAMP_NTZ são efetivamente equivalentes.Para definir o fuso horário, adicione a seguinte linha ao seu código Spark:
java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
Se você não implementar uma dessas abordagens, podem ocorrer modificações de hora indesejadas. Por exemplo, considere o seguinte cenário:
O fuso horário do Spark está definido como
America/New_York.O fuso horário do Snowflake está definido como
Europe/Warsaw, o que pode ser feito de uma destas formas:Definir
sfTimezonecomoEurope/Warsawpara o conector.Definir
sfTimezonecomosnowflakepara o conector e definir o parâmetro da sessão TIMEZONE no Snowflake comoEurope/Warsaw.
Tanto TIMESTAMP_NTZ como TIMESTAMP_LTZ estão em uso no Snowflake.
Neste cenário:
Se um valor representando
12:00:00em uma coluna TIMESTAMP_NTZ no Snowflake for enviado para o Spark, este valor não traz nenhuma informação de fuso horário. O Spark trata o valor como12:00:00em Nova Iorque.Se o Spark envia este valor
12:00:00(em Nova Iorque) de volta para o Snowflake para ser carregado em uma coluna TIMESTAMP_LTZ, ele é automaticamente convertido e carregado como18:00:00(para o fuso horário de Varsóvia).Se este valor for então convertido para TIMESTAMP_NTZ no Snowflake, o usuário vê
18:00:00, que é diferente do valor original,12:00:00.
Para resumir, a Snowflake recomenda seguir estritamente pelo menos uma destas regras:
Use o mesmo fuso horário, idealmente
UTC, tanto para o Spark como para o Snowflake.Utilize somente o tipo de dados TIMESTAMP_LTZ para transferir dados entre o Spark e o Snowflake.
Exemplo de programa Scala¶
Importante
Este programa de exemplo presume que você está usando a versão 2.2.0 (ou superior) do conector, que usa um estágio interno do Snowflake para armazenar dados temporários e, portanto, não requer um local S3 para armazenamento de dados temporários. Se você estiver usando uma versão anterior, é preciso ter um local S3 e incluir valores para tempdir, awsAccessKey, awsSecretKey para sfOptions. Para obter mais detalhes, consulte Opções do AWS para transferência de dados externa (neste tópico).
O programa Scala a seguir fornece um caso de uso completo para o conector do Snowflake para Spark. Antes de usar o código, substitua as seguintes cadeias de caracteres pelos valores apropriados, conforme descrito em Definição das opções de configuração para o conector (neste tópico):
<identificador_da_conta>: seu identificador de conta.<nome_de_usuário><senha>: credenciais de login para o usuário do Snowflake.<banco de dados>,<esquema><warehouse>: padrões para a sessão do Snowflake.
O programa Scala de exemplo usa autenticação básica (ou seja, nome de usuário e senha). Se quiser autenticar com OAuth, consulte Uso de OAuth externo (neste tópico).
import org.apache.spark.sql._
//
// Configure your Snowflake environment
//
var sfOptions = Map(
"sfURL" -> "<account_identifier>.snowflakecomputing.com",
"sfUser" -> "<user_name>",
"sfPassword" -> "<password>",
"sfDatabase" -> "<database>",
"sfSchema" -> "<schema>",
"sfWarehouse" -> "<warehouse>"
)
//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t1")
.load()
//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("query", "select c1, count(*) from t1 group by c1")
.load()
//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t2")
.mode(SaveMode.Overwrite)
.save()
Uso do conector com Python¶
Usar o conector com Python é muito semelhante ao uso com Scala.
Recomendamos utilizar o script bin/pyspark incluído na distribuição Spark.
Configuração do script pyspark¶
O script pyspark deve ser configurado de forma semelhante ao script spark-shell, usando-se as opções --packages ou --jars. Por exemplo:
bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3
Não se esqueça de incluir o conector Spark do Snowflake e os arquivos .jar do conector JDBC em sua variável de ambiente CLASSPATH.
Para obter mais informações sobre a configuração do script spark-shell, consulte Etapa 4: Configurar o cluster Spark local ou ambiente Spark hospedado no Amazon EMR.
Habilitação/desabilitação do pushdown em uma sessão¶
A versão 2.1.0 (e superior) do conector é compatível com o pushdown de consulta, que pode melhorar significativamente o desempenho enviando o processamento da consulta para o Snowflake quando ele é a fonte de dados do Spark.
Por padrão, o pushdown está ativado.
Para desabilitar o pushdown em uma sessão do Spark para um determinado DataFrame:
Após instanciar um objeto
SparkSession, chame o método estáticoSnowflakeConnectorUtils.disablePushdownSession, passando o objetoSparkSession. Por exemplo:sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
Crie um DataFrame com a opção autopushdown definida como
off. Por exemplo:df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", query) \ .option("autopushdown", "off") \ .load()
Observe que você também pode definir a opção
autopushdownem umDictionaryque você passa para o métodooptions(por exemplo, emsfOptionsno exemplo acima).
Para habilitar o pushdown novamente após desativá-lo, chame o método estático SnowflakeConnectorUtils.enablePushdownSession (passando o objeto SparkSession) e crie um DataFrame com autopushdown habilitado.
Script Python de exemplo¶
Importante
Este script de exemplo presume que você está usando a versão 2.2.0 (ou superior) do conector, que usa um estágio interno do Snowflake para armazenar dados temporários e, portanto, não requer um local S3 para armazenar estes dados. Se você estiver usando uma versão anterior, é preciso ter um local S3 e incluir valores para tempdir, awsAccessKey, awsSecretKey para sfOptions. Para obter mais detalhes, consulte Opções do AWS para transferência de dados externa (neste tópico).
Uma vez configurado o script pyspark, você pode realizar consultas SQL e outras operações. Aqui está um exemplo de script Python que realiza uma consulta SQL simples. Este script ilustra o uso básico de conectores. A maioria dos exemplos do Scala neste documento pode ser adaptada com um mínimo de esforço/alterações para uso com Python.
O script Python de exemplo usa autenticação básica (ou seja, nome de usuário e senha). Se quiser autenticar com OAuth, consulte Uso de OAuth externo (neste tópico).
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Simple App").getOrCreate()
# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")
# Set options below
sfOptions = {
"sfURL" : "<account_identifier>.snowflakecomputing.com",
"sfUser" : "<user_name>",
"sfPassword" : "<password>",
"sfDatabase" : "<database>",
"sfSchema" : "<schema>",
"sfWarehouse" : "<warehouse>",
"sfRole" : "Accountadmin"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "select 1 as my_num union all select 2 as my_num") \
.load()
df.show()
Dica
Observe o uso de sfOptions e SNOWFLAKE_SOURCE_NAME. Isto simplifica o código e reduz a chance de erros.
Para detalhes sobre as opções aceitas para sfOptions, consulte Definição das opções de configuração para o conector (neste tópico).
Mapeamentos de tipos de dados¶
O conector do Spark é compatível com a conversão entre muitos tipos de dados comuns.
Do SQL do Spark ao Snowflake¶
Tipo de dados do Spark
Tipo de dados do Snowflake
ArrayTypeVARIANT
BinaryTypeSem suporte
BooleanTypeBOOLEAN
ByteTypeINTEGER. O Snowflake não tem suporte para o tipo BYTE.
DateTypeDATE
DecimalTypeDECIMAL
DoubleTypeDOUBLE
FloatTypeFLOAT
IntegerTypeINTEGER
LongTypeINTEGER
MapTypeVARIANT
ShortTypeINTEGER
StringTypeSe o comprimento for especificado, VARCHAR(N); caso contrário, VARCHAR
StructTypeVARIANT
TimestampTypeTIMESTAMP
Do Snowflake para o SQL do Spark¶
Tipo de dados do Snowflake
Tipo de dados do Spark
ARRAY
StringTypeBIGINT
DecimalType(38, 0)BINARY
Sem suporte
BLOB
Sem suporte
BOOLEAN
BooleanTypeCHAR
StringTypeCLOB
StringTypeDATE
DateTypeDECIMAL
DecimalTypeDOUBLE
DoubleTypeFLOAT
DoubleTypeINTEGER
DecimalType(38, 0)OBJECT
StringTypeTIMESTAMP
TimestampTypeTIME
StringType(Versão do conector do Spark 2.4.14 ou posterior)VARIANT
StringType
Chamada do método DataFrame.show¶
Se você estiver chamando o método DataFrame.show e passando em um número menor que o número de linhas no DataFrame, construa um DataFrame que contenha apenas as linhas a serem mostradas de forma ordenada.
Para fazer isso:
Chame primeiro o método
sortpara retornar um DataFrame que contenha linhas ordenadas.Chame o método
limitnaquele DataFrame para retornar um DataFrame que contenha apenas as linhas que você deseja mostrar.Chame o método
showno DataFrame retornado.
Por exemplo, se você quiser mostrar 5 linhas e ter os resultados ordenados pela coluna my_col:
val dfWithRowsToShow = originalDf.sort("my_col").limit(5) dfWithRowsToShow.show(5)
Caso contrário, se você chamar show para exibir um subconjunto de linhas no DataFrame, diferentes execuções do código podem resultar na exibição de diferentes linhas.
Definição das opções de configuração para o conector¶
As seções seguintes listam as opções que você define para configurar o comportamento do conector:
Para definir estas opções, chame o método .option(<valor>, <de chave>) ou .options(<mapa>) da classe Spark DataframeReader.
Dica
Para facilitar o uso das opções, Snowflake recomenda especificar as opções em um único objeto Map e chamar .options(<mapa>) para definir as opções.
Opções de conexão necessárias¶
As seguintes opções são necessárias para a conexão com o Snowflake:
sfUrlEspecifica o nome de host de sua conta no seguinte formato:
account_identifier.snowflakecomputing.comaccount_identifieré seu identificador da conta.sfUserNome de login para o usuário do Snowflake.
Observe que você deve usar uma das seguintes opções para autenticar:
sfPasswordSenha para o usuário do Snowflake.
pem_private_keyChave privada (em formato PEM) para autenticação do par de chaves. Para obter instruções, consulte Autenticação de pares de chaves e rotação de pares de chaves.
sfAuthenticatorEspecifica o uso de OAuth externo para autenticar no Snowflake. Defina o valor como
oauth.A utilização do parâmetro OAuth externo requer a definição do parâmetro
sfToken.
sfToken(obrigatório se estiver usando OAuth externo) Definir o valor para o token de acesso do OAuth externo.
Este parâmetro de conexão requer o definir o valor do parâmetro
sfAuthenticatorcomooauth.O padrão é nenhum.
Opções de contexto necessárias¶
As seguintes opções são necessárias para definir o contexto de banco de dados e esquema para a sessão:
sfDatabaseO banco de dados a ser utilizado para a sessão após a conexão.
sfSchemaO esquema a ser usado para a sessão após a conexão.
Opções adicionais de contexto¶
As opções listadas nesta seção não são necessárias.
sfAccountIdentificador da conta (por exemplo,
myorganization-myaccount). Esta opção não é mais necessária porque o identificador da conta é especificado emsfUrl. Está documentada aqui apenas para compatibilidade retroativa.sfWarehouseO warehouse virtual padrão a ser utilizado para a sessão após a conexão.
sfRoleA função de segurança padrão a ser usada para a sessão após a conexão.
Opções de proxy¶
As opções listadas nesta seção não são necessárias.
use_proxyEspecifica se o conector deve usar um proxy:
trueespecifica que o conector deve usar um proxy.falseespecifica que o conector não deve usar um proxy.
O valor padrão é
false.proxy_host(Obrigatório se
use_proxyfortrue) Especifica o nome de host do servidor proxy a ser utilizado.proxy_port(Obrigatório se
use_proxyfortrue) Especifica o número da porta do servidor proxy a ser utilizada.proxy_protocolEspecifica o protocolo usado para se conectar ao servidor proxy. Especifique um dos seguintes valores:
httphttps
O valor padrão é
http.Isto só é suportado para Snowflake em AWS.
Esta opção foi adicionada na versão 2.11.1 do Spark Connector.
proxy_userEspecifica o nome do usuário para autenticação no servidor proxy. Defina isto se o servidor proxy exigir autenticação.
Isto só é suportado para Snowflake em AWS.
proxy_passwordEspecifica a senha de
proxy_userpara autenticação no servidor proxy. Defina isto se o servidor proxy exigir autenticação.Isto só é suportado para Snowflake em AWS.
non_proxy_hostsEspecifica a lista de hosts aos quais o conector deve se conectar diretamente, ignorando o servidor proxy.
Separe os nomes dos hosts com um símbolo de barra vertical com um caractere de escape na URL(
%7C). Você também pode usar um asterisco (*) como curinga.Isto só é suportado para Snowflake em AWS.
Opções adicionais¶
As opções listadas nesta seção não são necessárias.
sfTimezoneO fuso horário a ser utilizado pelo Snowflake ao trabalhar com o Spark. Observe que o parâmetro só define o fuso horário no Snowflake; o ambiente do Spark permanece inalterado. Os valores aceitos são:
spark: use o fuso horário do Spark (padrão).snowflake: use o fuso horário atual para o Snowflake.sf_default: use o fuso horário padrão para o usuário do Snowflake que está se conectando.time_zone: use um fuso horário específico (por exemplo,America/New_York), se válido.Para obter mais informações sobre o impacto da definição desta opção, consulte Como trabalhar com carimbos de data/hora e fusos horários (neste tópico).
sfCompressSe definido como
on(padrão), os dados passados entre o Snowflake e o Spark são compactados.s3MaxFileSizeO tamanho de arquivo usado ao mover dados do Snowflake para o Spark. O padrão é 10MB.
preactionsUma lista separada por ponto e vírgula de comandos SQL que são executados antes que os dados sejam transferidos entre o Spark e o Snowflake.
Se um comando SQL contém
%s, o%sé substituído pelo nome da tabela referenciada para a operação.postactionsUma lista separada por ponto e vírgula de comandos SQL que são executados depois que os dados são transferidos entre o Spark e o Snowflake.
Se um comando SQL contém
%s, ele é substituído pelo nome da tabela referenciada para a operação.truncate_columnsSe definido como
on(padrão), um comando COPY trunca automaticamente as cadeias de texto que excedem o comprimento da coluna de destino. Se definido comooff, o comando produz um erro se uma cadeia de caracteres carregada exceder o comprimento da coluna de destino.truncate_tableEste parâmetro controla se o Snowflake retém o esquema de uma tabela de destino do Snowflake ao sobrescrever essa tabela.
Por padrão, quando uma tabela de destino no Snowflake é sobregravada, o esquema dessa tabela de destino também é sobregravado; o novo esquema é baseado no esquema da tabela de origem (o dataframe do Spark).
No entanto, às vezes o esquema da origem não é o ideal. Por exemplo, um usuário pode querer que uma tabela de destino do Snowflake seja capaz de armazenar valores FLOAT no futuro, mesmo que o tipo de dados da coluna inicial da fonte seja INTEGER. Nesse caso, o esquema da tabela do Snowflake não deve ser sobregravado; a tabela do Snowflake deve apenas ser truncada e depois reaproveitada com seu esquema atual.
Os valores possíveis deste parâmetro são:
onoff
Se este parâmetro for
on, o esquema original da tabela de destino será mantido. Se este parâmetro foroff, então o antigo esquema da tabela será ignorado e um novo esquema será gerado com base no esquema da origem.Este parâmetro é opcional.
O valor padrão deste parâmetro é
off(ou seja, por padrão o esquema original da tabela é sobregravado).Para detalhes sobre o mapeamento dos tipos de dados do Spark para os tipos de dados do Snowflake (e vice-versa), consulte: Mapeamentos de tipos de dados (neste tópico).
continue_on_errorEsta variável controla se o comando COPY é anulado se o usuário inserir dados inválidos (por exemplo, um formato JSON inválido para uma coluna de tipo de dados variante).
Os valores possíveis são:
onoff
O valor
onsignifica continuar mesmo que ocorra um erro. O valoroffsignifica cancelar se ocorrer um erro.Este parâmetro é opcional.
O valor padrão deste parâmetro é
off.Não é recomendável ativar esta opção. Se qualquer erro for relatado enquanto COPYing no Snowflake com o conector do Spark, é provável que isso resulte em dados ausentes.
Nota
Se linhas forem rejeitadas ou faltarem, e essas linhas não estiverem claramente defeituosas na origem de entrada, informe a Snowflake.
usestagingtableEste parâmetro controla se o carregamento de dados utiliza uma tabela de preparação.
Uma tabela de preparação é uma tabela normal (com um nome temporário) que é criada pelo conector; se a operação de carregamento de dados for bem-sucedida, a tabela de destino original é descartada e a tabela de preparação é renomeada com o nome da tabela de destino original. Se a operação de carregamento de dados falhar, a tabela de preparação é descartada e a tabela de destino permanece com os dados que tinha imediatamente antes da operação. Assim, a tabela de preparação permite que os dados originais da tabela de destino sejam mantidos se a operação falhar. Por segurança, a Snowflake recomenda fortemente o uso de uma tabela de preparação na maioria das circunstâncias.
Para que o conector possa criar uma tabela de preparação, o usuário que executa COPY através do conector do Spark deve ter privilégios suficientes para criar uma tabela. O carregamento direto (ou seja, o carregamento sem utilizar uma tabela de preparação) é útil se o usuário não tiver permissão para criar uma tabela.
Os valores possíveis deste parâmetro são:
onoff
Se o parâmetro for
on, é utilizada uma tabela de preparação. Se este parâmetro foroff, então os dados são carregados diretamente na tabela de destino.Este parâmetro é opcional.
O valor padrão deste parâmetro é
on(ou seja, use uma tabela de preparação).
autopushdownEste parâmetro controla se o pushdown automático de consultas está habilitado.
Se o pushdown estiver habilitado, quando uma consulta for executada no Spark e se parte da consulta puder ser enviada para o servidor Snowflake, ela será enviada. Isto melhora o desempenho de algumas consultas.
Este parâmetro é opcional.
O valor padrão é
onse o conector estiver conectado a uma versão compatível do Spark. Caso contrário, o valor padrão éoff.Se o conector estiver vinculado a uma versão diferente do Spark (por exemplo, se a versão 3.2 do conector estiver vinculada à versão 3.3 do Spark), então o pushdown automático é desabilitado mesmo se este parâmetro estiver definido como
on.purgeSe estiver definido como
on, o conector exclui arquivos temporários criados durante a transferência do Spark para o Snowflake via transferência externa de dados. Se este parâmetro estiver definido comooff, esses arquivos não serão automaticamente excluídos pelo conector.A limpeza funciona somente para transferências do Spark para o Snowflake, e não para transferências do Snowflake para o Spark.
Os valores possíveis são
onoff
O valor padrão é
off.columnmapEste parâmetro é útil ao gravar dados do Spark no Snowflake quando os nomes das colunas na tabela do Snowflake não correspondem aos nomes das colunas na tabela do Spark. Você pode criar um mapa que indica qual coluna de origem do Spark corresponde a cada coluna de destino do Snowflake.
O parâmetro é um literal de cadeia de caracteres única, na forma de:
"Map(col_2 -> col_b, col_3 -> col_a)"Por exemplo, considere o seguinte cenário:
Um dataframe chamado
dfno Spark tem três colunas:col_1,col_2,col_3Uma tabela chamada
tbno Snowflake tem duas colunas:col_a,col_bVocê quer copiar os seguintes valores:
De
df.col_2paratb.col_b.De
df.col_3paratb.col_a.
O valor do parâmetro
columnmapseria:Map(col_2 -> col_b, col_3 -> col_a)Você pode gerar este valor executando o seguinte código do Scala:
Map("col_2"->"col_b","col_3"->"col_a").toString()O valor padrão deste parâmetro é nulo. Em outras palavras, por padrão, os nomes das colunas nas tabelas de origem e destino devem corresponder.
Este parâmetro é usado somente quando se grava do Spark no Snowflake; ele não se aplica quando se grava do Snowflake no Spark.
keep_column_caseAo gravar uma tabela do Spark no Snowflake, o conector do Spark, por padrão, muda as letras nos nomes das colunas para maiúsculas, a menos que os nomes das colunas estejam entre aspas duplas.
Ao gravar uma tabela do Snowflake no Spark, o conector do Spark, por padrão, adiciona aspas duplas em torno de qualquer nome de coluna que contenha quaisquer caracteres exceto letras maiúsculas, sublinhados e dígitos.
Se você definir keep_column_case como
on, o conector do Spark não fará estas mudanças.Os valores possíveis são:
onoff
O valor padrão é
off.column_mappingO conector deve mapear colunas do dataframe do Spark para a tabela do Snowflake. Isso pode ser feito com base nos nomes das colunas (independentemente da ordem), ou com base na ordem das colunas (ou seja, a primeira coluna do dataframe é mapeada para a primeira coluna na tabela, independentemente do nome da coluna).
Por padrão, o mapeamento é feito com base na ordem. Você pode substituir isso definindo este parâmetro como
name, o que diz ao conector para mapear colunas com base nos nomes das colunas. (O mapeamento de nomes não faz distinção entre maiúsculas e minúsculas).Os valores possíveis deste parâmetro são:
ordername
O valor padrão é
order.column_mismatch_behaviorEste parâmetro se aplica somente quando o parâmetro
column_mappingé definido comoname.Se os nomes das colunas no dataframe do Spark e na tabela do Snowflake não corresponderem, então:
Se
column_mismatch_behaviorforerror, o conector do Spark reporta um erro.Se
column_mismatch_behaviorforignore, o conector do Spark ignora o erro.O driver descarta qualquer coluna no dataframe do Spark que não tenha uma coluna correspondente na tabela do Snowflake.
O driver insere NULLs em qualquer coluna da tabela do Snowflake que não tenha uma coluna correspondente no dataframe do Spark.
Os erros em potencial incluem:
O dataframe do Spark pode conter colunas que são idênticas, exceto quanto a letras maiúsculas/minúsculas. Como o mapeamento de nomes de colunas não diferencia maiúsculas de minúsculas, não é possível determinar o mapeamento correto desde o dataframe até a tabela.
A tabela do Snowflake pode conter colunas que são idênticas, exceto quanto a letras maiúsculas/minúsculas. Como o mapeamento de nomes de colunas não diferencia maiúsculas de minúsculas, não é possível determinar o mapeamento correto desde o dataframe até a tabela.
O dataframe do Spark e a tabela do Snowflake podem não ter nomes de colunas em comum. Em teoria, o conector do Spark poderia inserir NULLs em cada coluna de cada linha, mas isso geralmente é inútil, então o conector lança um erro mesmo que o
column_mismatch_behavioresteja definido comoignore.
Os valores possíveis deste parâmetro são:
errorignore
O valor padrão é
error.time_output_formatEste parâmetro permite que o usuário especifique o formato dos dados
TIMEretornados.Os valores possíveis deste parâmetro são os valores possíveis para os formatos de hora especificados em Formatos de hora.
Este parâmetro afeta apenas a saída, não a entrada.
timestamp_ntz_output_format, .timestamp_ltz_output_format, .timestamp_tz_output_formatEstas opções especificam o formato de saída para os valores de carimbo de data/hora. Os valores padrão destas opções são:
Opção de configuração
Valor padrão
timestamp_ntz_output_format"YYYY-MM-DD HH24:MI:SS.FF3"timestamp_ltz_output_format"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"timestamp_tz_output_format"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"Se estas opções estiverem definidas em
"sf_current", o conector utiliza os formatos especificados para a sessão.partition_size_in_mbEste parâmetro é usado quando o conjunto de resultados da consulta é muito grande e precisa ser dividido em múltiplas partições DataFrame. Este parâmetro especifica o tamanho não compactado recomendado para cada partição DataFrame. Para reduzir o número de partições, aumente este tamanho.
Este tamanho é usado como o valor recomendado; o tamanho real das partições pode ser menor ou maior.
Esta opção se aplica somente quando o parâmetro use_copy_unload é FALSE.
Este parâmetro é opcional.
O valor padrão é
100(MB).
use_copy_unloadSe esse for
FALSE, o Snowflake usa o formato de dados Arrow quando SELECTing dados. Se definido comoTRUE, então o Snowflake reverte para o comportamento antigo de usar o comandoCOPY UNLOADpara transmitir dados selecionados.Este parâmetro é opcional.
O valor padrão é
FALSE.treat_decimal_as_longSe
TRUE, configura o Spark Connector para retornar valoresLong(em vez de valoresBigDecimal) para consultas que retornam o tipoDecimal(precision, 0).O valor padrão é
FALSE.Esta opção foi adicionada na versão 2.11.1 do Spark Connector.
s3_stage_vpce_dns_nameEspecifica o nome de DNS de seu ponto de extremidade VPC para o acesso aos estágios internos.
Esta opção foi adicionada na versão 2.11.1 do Spark Connector.
support_share_connectionSe
FALSE, configura o conector Spark para criar uma nova conexão JDBC para cada trabalho ou ação que usa as mesmas opções do conector Spark para acessar o Snowflake.O valor padrão é
TRUE, o que significa que os diferentes trabalhos e ações compartilham a mesma conexão JDBC se eles usarem as mesmas opções do conector Spark para acessar o Snowflake.Se você precisar habilitar ou desabilitar essa configuração programaticamente, use as seguintes funções estáticas globais:
SparkConnectorContext.disableSharedConnection()SparkConnectorContext.enableSharingJDBCConnection()
Nota
Nos seguintes casos especiais, o conector Spark não usa uma conexão JDBC compartilhada:
Se pré-ações ou pós-ações forem definidas e essas pré-ações ou pós-ações não forem CREATE TABLE, DROP TABLE ou MERGE INTO, o conector Spark não usa a conexão compartilhada.
Funções de utilitário em Utils, como
Utils.runQuery()eUtils.getJDBCConnection()não usam a conexão compartilhada.
Esta opção foi adicionada na versão 2.11.2 do conector Spark.
force_skip_pre_post_action_check_for_shared_sessionSe
TRUE, configura o conector Spark para desabilitar a validação de pré-ações e pós-ações para compartilhamento de sessão.O valor padrão é
FALSE.Importante
Antes de definir esta opção, verifique se as consultas em pré-ações e pós-ações não afetam as configurações da sessão. Caso contrário, você pode encontrar problemas com os resultados.
Esta opção foi adicionada na versão 2.11.3 do conector Spark.
Uso de autenticação e rodízio do par de chaves¶
O conector do Spark é compatível com a autenticação de par de chaves e a rotação de chaves.
Para começar, complete a configuração inicial para autenticação de par de chaves como mostrado em Autenticação de pares de chaves e rotação de pares de chaves.
Envie uma cópia não criptografada da chave privada usando a opção de conexão
pem_private_key.
Atenção
Por razões de segurança, em vez de codificar o pem_private_key em seu aplicativo, você deve definir o parâmetro dinamicamente após ler a chave a partir de uma fonte segura. Se a chave estiver criptografada, então descriptografe-a e envie a versão descriptografada.
No exemplo com Python, observe que o arquivo pem_private_key, rsa_key.p8, está:
Sendo lido diretamente de um arquivo protegido por senha, usando a variável de ambiente
PRIVATE_KEY_PASSPHRASE.Usando a expressão
pkbna cadeia de caracteressfOptions.
Para conectar, você pode salvar o exemplo Python em um arquivo (ou seja, <file.py>) e então executar o seguinte comando:
spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>
Python
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os
with open("<path>/rsa_key.p8", "rb") as key_file:
p_key = serialization.load_pem_private_key(
key_file.read(),
password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend = default_backend()
)
pkb = p_key.private_bytes(
encoding = serialization.Encoding.PEM,
format = serialization.PrivateFormat.PKCS8,
encryption_algorithm = serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")
sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')
sfOptions = {
"sfURL" : "<account_identifier>.snowflakecomputing.com",
"sfUser" : "<user_name>",
"pem_private_key" : pkb,
"sfDatabase" : "<database>",
"sfSchema" : "schema",
"sfWarehouse" : "<warehouse>"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "COLORS") \
.load()
df.show()
Uso de OAuth externo¶
Começando pela versão do conector do Spark 2.7.0, você pode usar OAuth externo para autenticação no Snowflake usando o programa Scala ou o script Python de exemplo.
Antes de usar o OAuth externo e o conector do Spark para autenticação no Snowflake, configure uma integração de segurança de OAuth externo para um dos servidores de autorização de OAuth externo compatíveis ou para um cliente personalizado de OAuth externo.
Nos exemplos de Scala e Python, observe a substituição do parâmetro sfPassword pelos parâmetros sfAuthenticator e sfToken.
Scala:
// spark connector version val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME import org.apache.spark.sql.DataFrame var sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<username>", "sfAuthenticator" -> "oauth", "sfToken" -> "<external_oauth_access_token>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) // // Create a DataFrame from a Snowflake table // val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "region") .load() // // Join, augment, aggregate, etc. the data in Spark and then use the // Data Source API to write the data back to a table in Snowflake // df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t2") .mode(SaveMode.Overwrite) .save()
Python:
from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * sc = SparkContext("local", "Simple App") spark = SQLContext(sc) spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>') # You might need to set these sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>") sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>") # Set options below sfOptions = { "sfURL" : "<account_identifier>.snowflakecomputing.com", "sfUser" : "<user_name>", "sfAuthenticator" : "oauth", "sfToken" : "<external_oauth_access_token>", "sfDatabase" : "<database>", "sfSchema" : "<schema>", "sfWarehouse" : "<warehouse>" } SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", "select 1 as my_num union all select 2 as my_num") \ .load() df.show()
Opções do AWS para transferência de dados externa¶
Estas opções são usadas para especificar o local do Amazon S3 onde os dados temporários são armazenados e fornecer detalhes de autenticação para acessar o local. Elas são necessárias somente se você estiver fazendo uma transferência de dados externa. As transferências de dados externas são necessárias se uma das seguintes opções for verdadeira:
Você está usando a versão 2.1.x ou anterior do conector do Spark (que não aceita transferências internas), ou
Sua transferência provavelmente levará 36 horas ou mais (as transferências internas utilizam credenciais temporárias que expiram após 36 horas).
tempDirO local S3 onde os dados intermediários são armazenados (por exemplo,
s3n://xy12345-bucket/spark-snowflake-tmp/).Se
tempDirfor especificado, você também deve especificar uma destas opções:awsAccessKey,awsSecretKey. outemporary_aws_access_key_id,temporary_aws_secret_access_key,temporary_aws_session_token
awsAccessKey,awsSecretKeyEstas são credenciais AWS padrão que permitem o acesso ao local especificado em
tempDir. Observe que ambas as opções devem ser definidas em conjunto.Se forem definidas, podem ser recuperadas do objeto
SparkContextexistente.Se você especificar estas variáveis, também deverá especificar
tempDir.Estas credenciais também devem ser definidas para o cluster Hadoop.
temporary_aws_access_key_id,temporary_aws_secret_access_key,temporary_aws_session_tokenElas são credenciais AWS temporárias que permitem o acesso ao local especificado em
tempDir. Observe que todas essas três opções devem ser definidas em conjunto.Além disso, se essas opções forem definidas, terão precedência sobre as opções
awsAccessKeyeawsSecretKey.Se você especificar
temporary_aws_access_key_id,temporary_aws_secret_access_keyetemporary_aws_session_token, também deverá especificartempDir. Caso contrário, estes parâmetros são ignorados.check_bucket_configurationSe definido como
on(padrão), o conector verifica se o bucket usado para transferência de dados tem uma política de ciclo de vida configurada (consulte Preparação de um bucket AWS S3 externo para mais informações). Se não houver uma política de ciclo de vida presente, um aviso é registrado.Desabilitar essa opção (definindo-a como
off) ignora essa verificação. Isto pode ser útil se um usuário puder acessar as operações de dados do bucket, mas não as políticas de ciclo de vida. Desabilitar a opção também pode acelerar ligeiramente os tempos de execução da consulta.
Para obter mais detalhes, consulte Autenticação do S3 para troca de dados (neste tópico).
Opções do Azure para transferência de dados externa¶
Esta seção descreve os parâmetros que se aplicam ao armazenamento de blobs do Azure ao fazer transferências de dados externas. As transferências de dados externas são necessárias se uma das seguintes opções for verdadeira:
Você está usando a versão 2.1.x ou anterior do conector do Spark (que não aceita transferências internas), ou
Sua transferência provavelmente levará 36 horas ou mais (as transferências internas utilizam credenciais temporárias que expiram após 36 horas).
Ao utilizar uma transferência externa com armazenamento de blobs do Azure, você especifica a localização do contêiner do Azure e a SAS (assinatura de acesso compartilhado) para esse contêiner usando os parâmetros descritos abaixo.
tempDirO contêiner de armazenamento de blobs do Azure onde são armazenados os dados intermediários. Ele tem a forma de um URL, por exemplo:
wasb://<contêiner_do_azure>@<conta_do_azure>.<ponto_de_extremidade_do_azure>/temporary_azure_sas_tokenEspecifique o token SAS para o armazenamento de blobs do Azure.
Para obter mais detalhes, consulte Autenticação do Azure para troca de dados (neste tópico).
Especificação de informações do Azure para armazenamento temporário no Spark¶
Ao utilizar o armazenamento de blobs do Azure para fornecer armazenamento temporário para transferência de dados entre o Spark e o Snowflake, você deve fornecer ao Spark, assim como ao conector Spark do Snowflake, o local e as credenciais para o armazenamento temporário.
Para fornecer ao Spark o local de armazenamento temporário, execute comandos semelhantes aos seguintes em seu cluster do Spark:
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Observe que o último comando contém as seguintes variáveis:
<contêiner>e<conta>: são os nomes do contêiner e da conta para sua implantação do Azure.<ponto_de_extremidade_do_azure>: o ponto de extremidade de seu local de implantação do Azure. Por exemplo, se você estiver usando uma implantação US do Azure, é provável que o ponto de extremidade sejablob.core.windows.net.<azure_sas>: o token de segurança de Assinatura de Acesso Compartilhado.
Substitua cada uma dessas variáveis pelas informações adequadas para sua conta de armazenamento de blobs do Azure.
Passagem de parâmetros da sessão do Snowflake como opções para o conector¶
O conector do Snowflake para Spark aceita o envio de parâmetros arbitrários em nível de sessão ao Snowflake (consulte Parâmetros de sessão para mais informações). Isso pode ser feito adicionando-se um par ("<chave>" -> "<valor>") ao objeto options, onde <chave> é o nome do parâmetro da sessão e <valor> é o valor.
Nota
O <valor> deve ser uma cadeia de caracteres entre aspas duplas, mesmo para parâmetros que aceitam números ou valores booleanos (por exemplo, "1" ou "true").
Por exemplo, o código de exemplo a seguir passa o parâmetro de sessão USE_CACHED_RESULT com um valor de "false", que desabilita o uso dos resultados de consultas executadas anteriormente:
// ... assuming sfOptions contains Snowflake connector options
// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")
// ... now use sfOptions with the .options() method
Considerações de segurança¶
Os clientes devem garantir que em um sistema Spark com múltiplos nós as comunicações entre os nós sejam seguras. O mestre do Spark envia credenciais do Snowflake aos nós de trabalho do Spark para que eles possam acessar os estágios do Snowflake. Se as comunicações entre o mestre e os nós de trabalho do Spark não forem seguras, as credenciais poderão ser lidas por um terceiro não autorizado.
Autenticação do S3 para troca de dados¶
Esta seção descreve como autenticar ao usar o S3 para troca de dados.
Esta tarefa é necessária somente em uma das seguintes circunstâncias:
A versão do conector do Snowflake para Spark é 2.1.x (ou anterior). A partir da versão v2.2.0, o conector utiliza um estágio temporário interno do Snowflake para a troca de dados. Se você não estiver usando a versão 2.2.0 (ou superior) do conector, a Snowflake recomenda fortemente a atualização para a versão mais recente.
A versão do conector do Snowflake para Spark é 2.2.0 (ou superior), mas seus trabalhos costumam exceder 36 horas. Esta é a duração máxima do token AWS utilizado pelo conector para acessar o estágio interno para troca de dados.
Se estiver usando uma versão mais antiga do conector, você precisa preparar um local S3 que o conector possa usar para trocar dados entre o Snowflake e o Spark.
Para permitir o acesso ao bucket/diretório S3 usado para trocar dados entre o Spark e o Snowflake (como especificado para tempDir), dois métodos de autenticação são aceitos:
Credenciais permanentes do AWS (também usadas para configurar a autenticação Hadoop/Spark para acessar o S3)
Credenciais temporárias do AWS
Uso de credenciais permanentes do AWS¶
Este é o método de autenticação padrão do AWS. Ele requer um par de valores awsAccessKey e awsSecretKey.
Nota
Esses valores também precisam ser usados para configurar o Hadoop/Spark para acessar o S3. Para obter mais informações, incluindo exemplos, consulte Autenticação no Hadoop/Spark usando S3A ou S3N (neste tópico).
Por exemplo:
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>") // Then, configure your Snowflake environment // var sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<user_name>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>", "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"), "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"), "tempdir" -> "s3n://<temp-bucket-name>" )
Para detalhes sobre as opções aceitas por sfOptions, consulte Opções do AWS para transferência de dados externa (neste tópico).
Autenticação no Hadoop/Spark usando S3A ou S3N¶
Ecossistemas Hadoop/Spark aceitam 2 esquemas URI para acesso ao S3:
s3a://Novo método recomendado (para Hadoop 2.7 e superior)
Para usar este método, modifique os exemplos do Scala neste tópico para adicionar as seguintes opções de configuração do Hadoop:
val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3a.access.key", <accessKey>) hadoopConf.set("fs.s3a.secret.key", <secretKey>)
Certifique-se de que a opção
tempdirutilize tambéms3a://.s3n://Método mais antigo (para Hadoop 2.6 e anterior)
Em alguns sistemas, é necessário especificá-lo explicitamente como mostrado no exemplo do Scala a seguir:
val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>) hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)
Uso de credenciais temporárias do AWS¶
Este método usa as opções de configuração temporary_aws_access_key_id, temporary_aws_secret_access_key e temporary_aws_session_token para o conector.
Este método oferece segurança adicional fornecendo ao Snowflake apenas acesso temporário ao bucket/diretório S3 utilizado para troca de dados.
Nota
As credenciais temporárias só podem ser usadas para configurar a autenticação S3 para o conector; elas não podem ser usadas para configurar a autenticação Hadoop/Spark.
Além disso, se você fornecer credenciais temporárias, elas têm precedência sobre quaisquer credenciais permanentes que tenham sido fornecidas.
O código do Scala a seguir fornece um exemplo de autenticação usando credenciais temporárias:
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest import net.snowflake.spark.snowflake.Parameters // ... val sts_client = new AWSSecurityTokenServiceClient() val session_token_request = new GetSessionTokenRequest() // Set the token duration to 2 hours. session_token_request.setDurationSeconds(7200) val session_token_result = sts_client.getSessionToken(session_token_request) val session_creds = session_token_result.getCredentials() // Create a new set of Snowflake connector options, based on the existing // sfOptions definition, with additional temporary credential options that override // the credential options in sfOptions. // Note that constants from Parameters are used to guarantee correct // key names, but literal values, such as temporary_aws_access_key_id are, of course, // also allowed. var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId()) sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey()) sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())
sfOptions2 pode agora ser usado com o método de DataFrame options().
Autenticação do Azure para troca de dados¶
Esta seção descreve como autenticar ao utilizar o armazenamento de blobs do Azure para troca de dados.
A autenticação desta forma é necessária somente em uma das seguintes circunstâncias:
A versão do conector do Snowflake para Spark é 2.1.x (ou anterior). A partir da versão v2.2.0, o conector utiliza um estágio temporário interno do Snowflake para a troca de dados. Se você não estiver usando a versão 2.2.0 (ou superior) do conector, a Snowflake recomenda fortemente a atualização para a versão mais recente.
A versão do conector do Snowflake para Spark é 2.2.0 (ou superior), mas seus trabalhos costumam exceder 36 horas. Esta é a duração máxima do token Azure utilizado pelo conector para acessar o estágio interno de troca de dados.
Você precisa preparar um contêiner de armazenamento de blobs do Azure que o conector possa usar para trocar dados entre o Snowflake e o Spark.
Uso de credenciais do Azure¶
Este é o método de autenticação padrão do armazenamento de blobs do Azure. Ele requer um par de valores: tempDir (um URL) e valores temporary_azure_sas_token.
Nota
Estes valores também precisam ser usados para configurar o Hadoop/Spark para acessar o armazenamento de blobs do Azure. Para obter mais informações, incluindo exemplos, consulte Autenticação no Hadoop/Spark usando Azure (neste tópico).
Por exemplo:
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>) // Then, configure your Snowflake environment // val sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<user_name>", "sfPassword" -> "<password>", "sfDatabase" -> "<database_name>", "sfSchema" -> "<schema_name>", "sfWarehouse" -> "<warehouse_name>", "sfCompress" -> "on", "sfSSL" -> "on", "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/", "temporary_azure_sas_token" -> "<azure_sas>" )
Para detalhes sobre as opções aceitas por sfOptions, consulte Opções do Azure para transferência de dados externa (neste tópico).
Autenticação no Hadoop/Spark usando Azure¶
Para usar este método, modifique os exemplos do Scala neste tópico para adicionar as seguintes opções de configuração do Hadoop:
val hadoopConf = sc.hadoopConfiguration sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Certifique-se de que a opção tempdir utilize também wasb://.
A autenticação através de um navegador não é aceita¶
Ao usar o conector do Spark, é impraticável usar qualquer forma de autenticação que abra uma janela do navegador para pedir credenciais ao usuário. A janela não apareceria necessariamente na máquina do cliente. Portanto, o conector do Spark não aceita nenhum tipo de autenticação, incluindo MFA (autenticação multifator) ou SSO (login único), que chame uma janela do navegador.