Uso do conector do Spark

O conector é compatível com a API padrão do Spark, mas com a adição de opções específicas do Snowflake, que são descritas neste tópico.

Neste tópico, o termo COPY refere-se a:

  • COPY INTO <tabela> (usado para transferir dados de um estágio interno ou externo para uma tabela).

  • COPY INTO <local> (usado para transferir dados de uma tabela para um estágio interno ou externo).

Neste tópico:

Verificação da conexão da rede ao Snowflake com SnowCD

Após configurar seu driver, você pode avaliar e solucionar problemas de conectividade de rede com o Snowflake usando o SnowCD.

Você pode usar o SnowCD durante o processo de configuração inicial e sob demanda a qualquer momento para avaliar e solucionar problemas de sua conexão de rede ao Snowflake.

Pushdown

O conector do Spark aplica o pushdown de predicado e consulta capturando e analisando os planos lógicos do Spark para operações SQL. Quando a fonte de dados é o Snowflake, as operações são convertidas em uma consulta SQL e depois executadas no Snowflake para melhorar o desempenho.

Entretanto, como essa precisa ser quase uma conversão um-para-um dos operadores SQL do Spark para expressões do Snowflake, nem todos os operadores SQL do Spark podem ser otimizados. Quando o pushdown falha, o conector volta para um plano de execução menos otimizado. As operações sem suporte são realizadas no Spark.

Nota

Se você precisar de pushdown para todas as operações, considere escrever seu código para usar API Snowpark em seu lugar.

Abaixo está uma lista das operações com suporte para pushdown (todas as funções abaixo usam seus nomes Spark). Se uma função não estiver nesta lista, um plano do Spark que a utilize pode ser executado no Spark em vez de ser enviado ao Snowflake.

  • Funções de agregação

    • Média

    • Corr

    • CovPopulation

    • CovSample

    • Contagem

    • Max

    • Min

    • StddevPop

    • StddevSamp

    • Soma

    • VariancePop

    • VarianceSamp

  • Operadores booleanos

    • E

    • Between

    • Contains

    • EndsWith

    • EqualTo

    • GreaterThan

    • GreaterThanOrEqual

    • In

    • IsNull

    • IsNotNull

    • LessThan

    • LessThanOrEqual

    • Not

    • Ou

    • StartsWith

  • Funções de data, hora e carimbo de data/hora

    • DateAdd

    • DateSub

    • Month

    • Quarter

    • TruncDate

    • TruncTimestamp

    • Ano

  • Funções matemáticas

    • Operadores aritméticos “+” (adição), “-” (subtração), “*” (multiplicação), “/” (divisão) e “-” (negação unária).

    • Abs

    • Acos

    • Asin

    • Atan

    • Ceil

    • CheckOverflow

    • Cos

    • Cosh

    • Exp

    • Floor

    • Greatest

    • Least

    • Log

    • Pi

    • Pow

    • PromotePrecision

    • Rand

    • Round

    • Sin

    • Sinh

    • Sqrt

    • Tan

    • Tanh

  • Operadores diversos

    • Alias (expressões AS)

    • BitwiseAnd

    • BitwiseNot

    • BitwiseOr

    • BitwiseXor

    • CaseWhen

    • Cast(child, t, _)

    • Coalesce

    • If

    • MakeDecimal

    • ScalarSubquery

    • ShiftLeft

    • ShiftRight

    • SortOrder

    • UnscaledValue

  • Operadores relacionais

    • Funções agregadas e cláusulas group-by

    • Distinct

    • Filtros

    • In

    • InSet

    • Junções

    • Limites

    • Projeções

    • Classificações (ORDER BY)

    • Union e Union All

    • Funções de janela e cláusulas de windowing

  • Funções de cadeia de caracteres

    • Ascii

    • Concat(children)

    • Length

    • Like

    • Lower

    • StringLPad

    • StringRPad

    • StringTranslate

    • StringTrim

    • StringTrimLeft

    • StringTrimRight

    • Substring

    • Upper

  • Funções de janela (nota: não funcionam com o Spark 2.2)

    • DenseRank

    • Rank

    • RowNumber

Uso do conector no Scala

Especificação do nome da classe de fonte de dados

Para usar o Snowflake como fonte de dados no Spark, use a opção .format para fornecer o nome da classe do conector do Snowflake que define a fonte de dados.

net.snowflake.spark.snowflake

Para garantir uma verificação no tempo de compilação do nome da classe, a Snowflake recomenda fortemente a definição de uma variável para o nome da classe. Por exemplo:

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
Copy

Além disso, por conveniência, a classe Utils fornece a variável, que pode ser importada da seguinte forma:

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
Copy

Nota

Todos os exemplos neste tópico utilizam SNOWFLAKE_SOURCE_NAME como a definição de classe.

Habilitação/desabilitação do pushdown em uma sessão

A versão 2.1.0 (e superior) do conector é compatível com o pushdown de consulta, que pode melhorar significativamente o desempenho enviando o processamento da consulta para o Snowflake quando ele é a fonte de dados do Spark.

Por padrão, o pushdown está ativado.

Para desabilitar o pushdown em uma sessão do Spark para um determinado DataFrame:

  1. Após instanciar um objeto SparkSession, chame o método estático SnowflakeConnectorUtils.disablePushdownSession, passando o objeto SparkSession. Por exemplo:

    SnowflakeConnectorUtils.disablePushdownSession(spark)
    
    Copy

    Onde spark é seu objeto SparkSession.

  2. Crie um DataFrame com a opção autopushdown definida como off. Por exemplo:

    val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME)
      .options(sfOptions)
      .option("query", query)
      .option("autopushdown", "off")
      .load()
    
    Copy

    Observe que você também pode definir a opção autopushdown em um Map que você passa para o método options (por exemplo, em sfOptions no exemplo acima).

Para habilitar o pushdown novamente após desativá-lo, chame o método estático SnowflakeConnectorUtils.enablePushdownSession (passando o objeto SparkSession) e crie um DataFrame com autopushdown habilitado.

Movimentação de dados do Snowflake para o Spark

Nota

Ao utilizar DataFrames, o conector do Snowflake aceita somente consultas SELECT.

Para ler dados do Snowflake em umDataFrame do Spark:

  1. Use o método read() do objeto SqlContext para construir um DataFrameReader.

  2. Especifique SNOWFLAKE_SOURCE_NAME usando o método format(). Para a definição, consulte Especificação do nome da classe de fonte de dados (neste tópico).

  3. Especifique as opções do conector usando o método option() ou options(). Para obter mais informações, consulte Definição das opções de configuração para o conector (neste tópico).

  4. Especifique uma das seguintes opções para os dados da tabela a serem lidos:

    • dbtable: nome da tabela a ser lida. Todas as colunas e registros são recuperados (ou seja, é equivalente a SELECT * FROM db_table).

    • query: consulta exata (instrução SELECT) a ser executada.

Notas de uso

  • Atualmente, o conector não aceita outros tipos de consultas (por exemplo, SHOW ou DESC, ou instruções DML) ao usar DataFrames.

  • Há um limite máximo para o tamanho de uma linha individual. Para obter mais detalhes, consulte Limites no tamanho do texto de consulta.

Considerações de desempenho

Ao transferir dados entre o Snowflake e o Spark, use os seguintes métodos para analisar/melhorar o desempenho:

  • Use o método net.snowflake.spark.snowflake.Utils.getLastSelect() para ver a consulta real emitida ao mover os dados do Snowflake para o Spark.

  • Se usar a funcionalidade filter ou where do DataFrame do Spark, verifique se os respectivos filtros estão presentes na consulta SQL emitida. O conector do Snowflake tenta converter todos os filtros solicitados pelo Spark para SQL.

    No entanto, existem formas de filtros que a infraestrutura do Spark hoje não passa para o conector do Snowflake. Como resultado, em algumas situações, um grande número de registros desnecessários é solicitado ao Snowflake.

  • Se você precisar de apenas um subconjunto de colunas, certifique-se de indicar o subconjunto na consulta SQL.

  • Em geral, se a consulta SQL emitida não corresponder ao que você espera com base nas operações do DataFrame, use a opção query para fornecer a sintaxe SQL exata que deseja.

Exemplos

Ler uma tabela inteira:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()
Copy

Ler os resultados de uma consulta:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()
Copy

Movimentação de dados do Spark para o Snowflake

As etapas para salvar o conteúdo de um DataFrame em uma tabela do Snowflake são semelhantes a gravar do Snowflake no Spark:

  1. Use o método write() do DataFrame para construir um DataFrameWriter.

  2. Especifique SNOWFLAKE_SOURCE_NAME usando o método format(). Para a definição, consulte Especificação do nome da classe de fonte de dados (neste tópico).

  3. Especifique as opções do conector usando o método option() ou options(). Para obter mais informações, consulte Definição das opções de configuração para o conector (neste tópico).

  4. Use a opção dbtable para especificar a tabela na qual os dados são gravados.

  5. Use o método mode() para especificar o modo de salvamento do conteúdo.

    Para obter mais informações, consulte SaveMode (documentação do Spark).

Exemplos

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Exportação de JSON do Spark para o Snowflake

DataFrames do Spark podem conter objetos JSON, serializados como cadeias de caracteres. O seguinte código fornece um exemplo de conversão de um DataFrame regular em um DataFrame contendo dados JSON:

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)
Copy

Observe que o jsonDataFrame resultante contém uma única coluna do tipo StringType. Como resultado, quando esse DataFrame é exportado para o Snowflake com o modo SaveMode.Overwrite comum, uma nova tabela no Snowflake é criada com uma única coluna do tipo VARCHAR.

Para carregar jsonDataFrame em uma coluna VARIANT:

  1. Crie uma tabela do Snowflake (conectando-se ao Snowflake em Java usando o driver JDBC do Snowflake). Para explicações sobre os parâmetros de conexão usados no exemplo, consulte Referência dos parâmetros de conexão do driver JDBC.

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
        String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.com/";
    
        Properties properties = new Properties();
        properties.put("user", "peter");
        properties.put("password", "test");
        properties.put("account", "myorganization-myaccount");
        properties.put("warehouse", "mywh");
        properties.put("db", "mydb");
        properties.put("schema", "public");
    
        // get connection
        System.out.println("Create JDBC connection");
        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
        System.out.println("Done creating JDBC connection\n");
        // create statement
        System.out.println("Create JDBC statement");
        Statement statement = connection.createStatement();
        System.out.println("Done creating JDBC statement\n");
        // create a table
        System.out.println("Create my_variant_table table");
        statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
        statement.close();
        System.out.println("Done creating demo table\n");
    
        connection.close();
        System.out.println("Close connection\n");
      }
    }
    
    Copy
  2. Em vez de usar SaveMode.Overwrite, use SaveMode.Append para reutilizar a tabela existente. Quando o valor da cadeia de caracteres que representa JSON é carregado no Snowflake, porque a coluna de destino é do tipo VARIANT, ela é analisada como JSON. Por exemplo:

    df.write
        .format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "my_variant_table")
        .mode(SaveMode.Append)
        .save()
    
    Copy

Execução de instruções SQL DDL/DML

Use o método runQuery() do objeto Utils para executar instruções SQL DDL/DML, além de consultas, por exemplo:

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
    )
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")
Copy

onde sfOptions é o mapa de parâmetros usado para ler/gravar DataFrames.

O método runQuery retorna apenas TRUE ou FALSE. Destina-se a instruções que não retornam um conjunto de resultados; por exemplo instruções DDL como CREATE TABLE e instruções DML como INSERT, UPDATE e DELETE. Não é útil para instruções que retornam um conjunto de resultados, tais como SELECT ou SHOW.

Como trabalhar com carimbos de data/hora e fusos horários

O Spark fornece apenas um tipo de carimbo de data/hora, equivalente ao tipo Timestamp do Scala/Java. É quase idêntico em comportamento ao tipo de dados TIMESTAMP_LTZ (fuso horário local) do Snowflake. Como tal, ao transferir dados entre o Spark e o Snowflake, a Snowflake recomenda utilizar as seguintes abordagens para preservar a hora corretamente, em relação aos fusos horários:

  • Use somente o tipo de dados TIMESTAMP_LTZ no Snowflake.

    Nota

    O mapeamento padrão do tipo de dados de carimbo de data/hora é TIMESTAMP_NTZ (sem fuso horário), então você precisa definir explicitamente o parâmetro TIMESTAMP_TYPE_MAPPING para usar TIMESTAMP_LTZ.

  • Defina o fuso horário do Spark como UTC e use esse fuso horário no Snowflake (ou seja, não defina a opção sfTimezone para o conector e não defina explicitamente um fuso horário no Snowflake). Neste cenário, TIMESTAMP_LTZ e TIMESTAMP_NTZ são efetivamente equivalentes.

    Para definir o fuso horário, adicione a seguinte linha ao seu código Spark:

    java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
    
    Copy

Se você não implementar uma dessas abordagens, podem ocorrer modificações de hora indesejadas. Por exemplo, considere o seguinte cenário:

  • O fuso horário do Spark está definido como America/New_York.

  • O fuso horário do Snowflake está definido como Europe/Warsaw, o que pode ser feito de uma destas formas:

    • Definir sfTimezone como Europe/Warsaw para o conector.

    • Definir sfTimezone como snowflake para o conector e definir o parâmetro da sessão TIMEZONE no Snowflake como Europe/Warsaw.

  • Tanto TIMESTAMP_NTZ como TIMESTAMP_LTZ estão em uso no Snowflake.

Neste cenário:

  1. Se um valor representando 12:00:00 em uma coluna TIMESTAMP_NTZ no Snowflake for enviado para o Spark, este valor não traz nenhuma informação de fuso horário. O Spark trata o valor como 12:00:00 em Nova Iorque.

  2. Se o Spark envia este valor 12:00:00 (em Nova Iorque) de volta para o Snowflake para ser carregado em uma coluna TIMESTAMP_LTZ, ele é automaticamente convertido e carregado como 18:00:00 (para o fuso horário de Varsóvia).

  3. Se este valor for então convertido para TIMESTAMP_NTZ no Snowflake, o usuário vê 18:00:00, que é diferente do valor original, 12:00:00.

Para resumir, a Snowflake recomenda seguir estritamente pelo menos uma destas regras:

  • Use o mesmo fuso horário, idealmente UTC, tanto para o Spark como para o Snowflake.

  • Utilize somente o tipo de dados TIMESTAMP_LTZ para transferir dados entre o Spark e o Snowflake.

Exemplo de programa Scala

Importante

Este programa de exemplo presume que você está usando a versão 2.2.0 (ou superior) do conector, que usa um estágio interno do Snowflake para armazenar dados temporários e, portanto, não requer um local S3 para armazenamento de dados temporários. Se você estiver usando uma versão anterior, é preciso ter um local S3 e incluir valores para tempdir, awsAccessKey, awsSecretKey para sfOptions. Para obter mais detalhes, consulte Opções do AWS para transferência de dados externa (neste tópico).

O programa Scala a seguir fornece um caso de uso completo para o conector do Snowflake para Spark. Antes de usar o código, substitua as seguintes cadeias de caracteres pelos valores apropriados, conforme descrito em Definição das opções de configuração para o conector (neste tópico):

  • <identificador_da_conta>: seu identificador de conta.

  • <nome_de_usuário> <senha>: credenciais de login para o usuário do Snowflake.

  • <banco de dados> , <esquema> <warehouse>: padrões para a sessão do Snowflake.

O programa Scala de exemplo usa autenticação básica (ou seja, nome de usuário e senha). Se quiser autenticar com OAuth, consulte Uso de OAuth externo (neste tópico).

import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) from t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Uso do conector com Python

Usar o conector com Python é muito semelhante ao uso com Scala.

Recomendamos utilizar o script bin/pyspark incluído na distribuição Spark.

Configuração do script pyspark

O script pyspark deve ser configurado de forma semelhante ao script spark-shell, usando-se as opções --packages ou --jars. Por exemplo:

bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3
Copy

Não se esqueça de incluir o conector Spark do Snowflake e os arquivos .jar do conector JDBC em sua variável de ambiente CLASSPATH.

Para obter mais informações sobre a configuração do script spark-shell, consulte Etapa 4: Configurar o cluster Spark local ou ambiente Spark hospedado no Amazon EMR.

Habilitação/desabilitação do pushdown em uma sessão

A versão 2.1.0 (e superior) do conector é compatível com o pushdown de consulta, que pode melhorar significativamente o desempenho enviando o processamento da consulta para o Snowflake quando ele é a fonte de dados do Spark.

Por padrão, o pushdown está ativado.

Para desabilitar o pushdown em uma sessão do Spark para um determinado DataFrame:

  1. Após instanciar um objeto SparkSession, chame o método estático SnowflakeConnectorUtils.disablePushdownSession, passando o objeto SparkSession. Por exemplo:

    sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
    
    Copy
  2. Crie um DataFrame com a opção autopushdown definida como off. Por exemplo:

    df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sfOptions) \
      .option("query",  query) \
      .option("autopushdown", "off") \
      .load()
    
    Copy

    Observe que você também pode definir a opção autopushdown em um Dictionary que você passa para o método options (por exemplo, em sfOptions no exemplo acima).

Para habilitar o pushdown novamente após desativá-lo, chame o método estático SnowflakeConnectorUtils.enablePushdownSession (passando o objeto SparkSession) e crie um DataFrame com autopushdown habilitado.

Script Python de exemplo

Importante

Este script de exemplo presume que você está usando a versão 2.2.0 (ou superior) do conector, que usa um estágio interno do Snowflake para armazenar dados temporários e, portanto, não requer um local S3 para armazenar estes dados. Se você estiver usando uma versão anterior, é preciso ter um local S3 e incluir valores para tempdir, awsAccessKey, awsSecretKey para sfOptions. Para obter mais detalhes, consulte Opções do AWS para transferência de dados externa (neste tópico).

Uma vez configurado o script pyspark, você pode realizar consultas SQL e outras operações. Aqui está um exemplo de script Python que realiza uma consulta SQL simples. Este script ilustra o uso básico de conectores. A maioria dos exemplos do Scala neste documento pode ser adaptada com um mínimo de esforço/alterações para uso com Python.

O script Python de exemplo usa autenticação básica (ou seja, nome de usuário e senha). Se quiser autenticar com OAuth, consulte Uso de OAuth externo (neste tópico).

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()
Copy

Dica

Observe o uso de sfOptions e SNOWFLAKE_SOURCE_NAME. Isto simplifica o código e reduz a chance de erros.

Para detalhes sobre as opções aceitas para sfOptions, consulte Definição das opções de configuração para o conector (neste tópico).

Mapeamentos de tipos de dados

O conector do Spark é compatível com a conversão entre muitos tipos de dados comuns.

Do SQL do Spark ao Snowflake

Tipo de dados do Spark

Tipo de dados do Snowflake

ArrayType

VARIANT

BinaryType

Sem suporte

BooleanType

BOOLEAN

ByteType

INTEGER. O Snowflake não tem suporte para o tipo BYTE.

DateType

DATE

DecimalType

DECIMAL

DoubleType

DOUBLE

FloatType

FLOAT

IntegerType

INTEGER

LongType

INTEGER

MapType

VARIANT

ShortType

INTEGER

StringType

Se o comprimento for especificado, VARCHAR(N); caso contrário, VARCHAR

StructType

VARIANT

TimestampType

TIMESTAMP

Do Snowflake para o SQL do Spark

Tipo de dados do Snowflake

Tipo de dados do Spark

ARRAY

StringType

BIGINT

DecimalType(38, 0)

BINARY

Sem suporte

BLOB

Sem suporte

BOOLEAN

BooleanType

CHAR

StringType

CLOB

StringType

DATE

DateType

DECIMAL

DecimalType

DOUBLE

DoubleType

FLOAT

DoubleType

INTEGER

DecimalType(38, 0)

OBJECT

StringType

TIMESTAMP

TimestampType

TIME

StringType (Versão do conector do Spark 2.4.14 ou posterior)

VARIANT

StringType

Chamada do método DataFrame.show

Se você estiver chamando o método DataFrame.show e passando em um número menor que o número de linhas no DataFrame, construa um DataFrame que contenha apenas as linhas a serem mostradas de forma ordenada.

Para fazer isso:

  1. Chame primeiro o método sort para retornar um DataFrame que contenha linhas ordenadas.

  2. Chame o método limit naquele DataFrame para retornar um DataFrame que contenha apenas as linhas que você deseja mostrar.

  3. Chame o método show no DataFrame retornado.

Por exemplo, se você quiser mostrar 5 linhas e ter os resultados ordenados pela coluna my_col:

val dfWithRowsToShow = originalDf.sort("my_col").limit(5)
dfWithRowsToShow.show(5)
Copy

Caso contrário, se você chamar show para exibir um subconjunto de linhas no DataFrame, diferentes execuções do código podem resultar na exibição de diferentes linhas.

Definição das opções de configuração para o conector

As seções seguintes listam as opções que você define para configurar o comportamento do conector:

Para definir estas opções, chame o método .option(<valor>, <de chave>) ou .options(<mapa>) da classe Spark DataframeReader.

Dica

Para facilitar o uso das opções, Snowflake recomenda especificar as opções em um único objeto Map e chamar .options(<mapa>) para definir as opções.

Opções de conexão necessárias

As seguintes opções são necessárias para a conexão com o Snowflake:

sfUrl

Especifica o nome de host de sua conta no seguinte formato:

account_identifier.snowflakecomputing.com

account_identifier é seu identificador da conta.

sfUser

Nome de login para o usuário do Snowflake.

Observe que você deve usar uma das seguintes opções para autenticar:

  • sfPassword

    Senha para o usuário do Snowflake.

  • pem_private_key

    Chave privada (em formato PEM) para autenticação do par de chaves. Para obter instruções, consulte Autenticação de pares de chaves e rotação de pares de chaves.

  • sfAuthenticator

    Especifica o uso de OAuth externo para autenticar no Snowflake. Defina o valor como oauth.

    A utilização do parâmetro OAuth externo requer a definição do parâmetro sfToken.

sfToken

(obrigatório se estiver usando OAuth externo) Definir o valor para o token de acesso do OAuth externo.

Este parâmetro de conexão requer o definir o valor do parâmetro sfAuthenticator como oauth.

O padrão é nenhum.

Opções de contexto necessárias

As seguintes opções são necessárias para definir o contexto de banco de dados e esquema para a sessão:

sfDatabase

O banco de dados a ser utilizado para a sessão após a conexão.

sfSchema

O esquema a ser usado para a sessão após a conexão.

Opções adicionais de contexto

As opções listadas nesta seção não são necessárias.

sfAccount

Identificador da conta (por exemplo, myorganization-myaccount). Esta opção não é mais necessária porque o identificador da conta é especificado em sfUrl. Está documentada aqui apenas para compatibilidade retroativa.

sfWarehouse

O warehouse virtual padrão a ser utilizado para a sessão após a conexão.

sfRole

A função de segurança padrão a ser usada para a sessão após a conexão.

Opções de proxy

As opções listadas nesta seção não são necessárias.

use_proxy

Especifica se o conector deve usar um proxy:

  • true especifica que o conector deve usar um proxy.

  • false especifica que o conector não deve usar um proxy.

O valor padrão é false.

proxy_host

(Obrigatório se use_proxy for true) Especifica o nome de host do servidor proxy a ser utilizado.

proxy_port

(Obrigatório se use_proxy for true) Especifica o número da porta do servidor proxy a ser utilizada.

proxy_protocol

Especifica o protocolo usado para se conectar ao servidor proxy. Especifique um dos seguintes valores:

  • http

  • https

O valor padrão é http.

Isto só é suportado para Snowflake em AWS.

Esta opção foi adicionada na versão 2.11.1 do Spark Connector.

proxy_user

Especifica o nome do usuário para autenticação no servidor proxy. Defina isto se o servidor proxy exigir autenticação.

Isto só é suportado para Snowflake em AWS.

proxy_password

Especifica a senha de proxy_user para autenticação no servidor proxy. Defina isto se o servidor proxy exigir autenticação.

Isto só é suportado para Snowflake em AWS.

non_proxy_hosts

Especifica a lista de hosts aos quais o conector deve se conectar diretamente, ignorando o servidor proxy.

Separe os nomes dos hosts com um símbolo de barra vertical com um caractere de escape na URL(%7C). Você também pode usar um asterisco (*) como curinga.

Isto só é suportado para Snowflake em AWS.

Opções adicionais

As opções listadas nesta seção não são necessárias.

sfTimezone

O fuso horário a ser utilizado pelo Snowflake ao trabalhar com o Spark. Observe que o parâmetro só define o fuso horário no Snowflake; o ambiente do Spark permanece inalterado. Os valores aceitos são:

  • spark: use o fuso horário do Spark (padrão).

  • snowflake: use o fuso horário atual para o Snowflake.

  • sf_default: use o fuso horário padrão para o usuário do Snowflake que está se conectando.

  • time_zone: use um fuso horário específico (por exemplo, America/New_York), se válido.

    Para obter mais informações sobre o impacto da definição desta opção, consulte Como trabalhar com carimbos de data/hora e fusos horários (neste tópico).

sfCompress

Se definido como on (padrão), os dados passados entre o Snowflake e o Spark são compactados.

s3MaxFileSize

O tamanho de arquivo usado ao mover dados do Snowflake para o Spark. O padrão é 10MB.

preactions

Uma lista separada por ponto e vírgula de comandos SQL que são executados antes que os dados sejam transferidos entre o Spark e o Snowflake.

Se um comando SQL contém %s, o %s é substituído pelo nome da tabela referenciada para a operação.

postactions

Uma lista separada por ponto e vírgula de comandos SQL que são executados depois que os dados são transferidos entre o Spark e o Snowflake.

Se um comando SQL contém %s, ele é substituído pelo nome da tabela referenciada para a operação.

truncate_columns

Se definido como on (padrão), um comando COPY trunca automaticamente as cadeias de texto que excedem o comprimento da coluna de destino. Se definido como off, o comando produz um erro se uma cadeia de caracteres carregada exceder o comprimento da coluna de destino.

truncate_table

Este parâmetro controla se o Snowflake retém o esquema de uma tabela de destino do Snowflake ao sobrescrever essa tabela.

Por padrão, quando uma tabela de destino no Snowflake é sobregravada, o esquema dessa tabela de destino também é sobregravado; o novo esquema é baseado no esquema da tabela de origem (o dataframe do Spark).

No entanto, às vezes o esquema da origem não é o ideal. Por exemplo, um usuário pode querer que uma tabela de destino do Snowflake seja capaz de armazenar valores FLOAT no futuro, mesmo que o tipo de dados da coluna inicial da fonte seja INTEGER. Nesse caso, o esquema da tabela do Snowflake não deve ser sobregravado; a tabela do Snowflake deve apenas ser truncada e depois reaproveitada com seu esquema atual.

Os valores possíveis deste parâmetro são:

  • on

  • off

Se este parâmetro for on, o esquema original da tabela de destino será mantido. Se este parâmetro for off, então o antigo esquema da tabela será ignorado e um novo esquema será gerado com base no esquema da origem.

Este parâmetro é opcional.

O valor padrão deste parâmetro é off (ou seja, por padrão o esquema original da tabela é sobregravado).

Para detalhes sobre o mapeamento dos tipos de dados do Spark para os tipos de dados do Snowflake (e vice-versa), consulte: Mapeamentos de tipos de dados (neste tópico).

continue_on_error

Esta variável controla se o comando COPY é anulado se o usuário inserir dados inválidos (por exemplo, um formato JSON inválido para uma coluna de tipo de dados variante).

Os valores possíveis são:

  • on

  • off

O valor on significa continuar mesmo que ocorra um erro. O valor off significa cancelar se ocorrer um erro.

Este parâmetro é opcional.

O valor padrão deste parâmetro é off.

Não é recomendável ativar esta opção. Se qualquer erro for relatado enquanto COPYing no Snowflake com o conector do Spark, é provável que isso resulte em dados ausentes.

Nota

Se linhas forem rejeitadas ou faltarem, e essas linhas não estiverem claramente defeituosas na origem de entrada, informe a Snowflake.

usestagingtable

Este parâmetro controla se o carregamento de dados utiliza uma tabela de preparação.

Uma tabela de preparação é uma tabela normal (com um nome temporário) que é criada pelo conector; se a operação de carregamento de dados for bem-sucedida, a tabela de destino original é descartada e a tabela de preparação é renomeada com o nome da tabela de destino original. Se a operação de carregamento de dados falhar, a tabela de preparação é descartada e a tabela de destino permanece com os dados que tinha imediatamente antes da operação. Assim, a tabela de preparação permite que os dados originais da tabela de destino sejam mantidos se a operação falhar. Por segurança, a Snowflake recomenda fortemente o uso de uma tabela de preparação na maioria das circunstâncias.

Para que o conector possa criar uma tabela de preparação, o usuário que executa COPY através do conector do Spark deve ter privilégios suficientes para criar uma tabela. O carregamento direto (ou seja, o carregamento sem utilizar uma tabela de preparação) é útil se o usuário não tiver permissão para criar uma tabela.

Os valores possíveis deste parâmetro são:

  • on

  • off

Se o parâmetro for on, é utilizada uma tabela de preparação. Se este parâmetro for off, então os dados são carregados diretamente na tabela de destino.

Este parâmetro é opcional.

O valor padrão deste parâmetro é on (ou seja, use uma tabela de preparação).

autopushdown

Este parâmetro controla se o pushdown automático de consultas está habilitado.

Se o pushdown estiver habilitado, quando uma consulta for executada no Spark e se parte da consulta puder ser enviada para o servidor Snowflake, ela será enviada. Isto melhora o desempenho de algumas consultas.

Este parâmetro é opcional.

O valor padrão é on se o conector estiver conectado a uma versão compatível do Spark. Caso contrário, o valor padrão é off.

Se o conector estiver vinculado a uma versão diferente do Spark (por exemplo, se a versão 3.2 do conector estiver vinculada à versão 3.3 do Spark), então o pushdown automático é desabilitado mesmo se este parâmetro estiver definido como on.

purge

Se estiver definido como on, o conector exclui arquivos temporários criados durante a transferência do Spark para o Snowflake via transferência externa de dados. Se este parâmetro estiver definido como off, esses arquivos não serão automaticamente excluídos pelo conector.

A limpeza funciona somente para transferências do Spark para o Snowflake, e não para transferências do Snowflake para o Spark.

Os valores possíveis são

  • on

  • off

O valor padrão é off.

columnmap

Este parâmetro é útil ao gravar dados do Spark no Snowflake quando os nomes das colunas na tabela do Snowflake não correspondem aos nomes das colunas na tabela do Spark. Você pode criar um mapa que indica qual coluna de origem do Spark corresponde a cada coluna de destino do Snowflake.

O parâmetro é um literal de cadeia de caracteres única, na forma de:

"Map(col_2 -> col_b, col_3 -> col_a)"

Por exemplo, considere o seguinte cenário:

  • Um dataframe chamado df no Spark tem três colunas:

    col_1, col_2, col_3

  • Uma tabela chamada tb no Snowflake tem duas colunas:

    col_a , col_b

  • Você quer copiar os seguintes valores:

    • De df.col_2 para tb.col_b.

    • De df.col_3 para tb.col_a.

O valor do parâmetro columnmap seria:

Map(col_2 -> col_b, col_3 -> col_a)

Você pode gerar este valor executando o seguinte código do Scala:

Map("col_2"->"col_b","col_3"->"col_a").toString()

O valor padrão deste parâmetro é nulo. Em outras palavras, por padrão, os nomes das colunas nas tabelas de origem e destino devem corresponder.

Este parâmetro é usado somente quando se grava do Spark no Snowflake; ele não se aplica quando se grava do Snowflake no Spark.

keep_column_case

Ao gravar uma tabela do Spark no Snowflake, o conector do Spark, por padrão, muda as letras nos nomes das colunas para maiúsculas, a menos que os nomes das colunas estejam entre aspas duplas.

Ao gravar uma tabela do Snowflake no Spark, o conector do Spark, por padrão, adiciona aspas duplas em torno de qualquer nome de coluna que contenha quaisquer caracteres exceto letras maiúsculas, sublinhados e dígitos.

Se você definir keep_column_case como on, o conector do Spark não fará estas mudanças.

Os valores possíveis são:

  • on

  • off

O valor padrão é off.

column_mapping

O conector deve mapear colunas do dataframe do Spark para a tabela do Snowflake. Isso pode ser feito com base nos nomes das colunas (independentemente da ordem), ou com base na ordem das colunas (ou seja, a primeira coluna do dataframe é mapeada para a primeira coluna na tabela, independentemente do nome da coluna).

Por padrão, o mapeamento é feito com base na ordem. Você pode substituir isso definindo este parâmetro como name, o que diz ao conector para mapear colunas com base nos nomes das colunas. (O mapeamento de nomes não faz distinção entre maiúsculas e minúsculas).

Os valores possíveis deste parâmetro são:

  • order

  • name

O valor padrão é order.

column_mismatch_behavior

Este parâmetro se aplica somente quando o parâmetro column_mapping é definido como name.

Se os nomes das colunas no dataframe do Spark e na tabela do Snowflake não corresponderem, então:

  • Se column_mismatch_behavior for error, o conector do Spark reporta um erro.

  • Se column_mismatch_behavior for ignore, o conector do Spark ignora o erro.

    • O driver descarta qualquer coluna no dataframe do Spark que não tenha uma coluna correspondente na tabela do Snowflake.

    • O driver insere NULLs em qualquer coluna da tabela do Snowflake que não tenha uma coluna correspondente no dataframe do Spark.

Os erros em potencial incluem:

  • O dataframe do Spark pode conter colunas que são idênticas, exceto quanto a letras maiúsculas/minúsculas. Como o mapeamento de nomes de colunas não diferencia maiúsculas de minúsculas, não é possível determinar o mapeamento correto desde o dataframe até a tabela.

  • A tabela do Snowflake pode conter colunas que são idênticas, exceto quanto a letras maiúsculas/minúsculas. Como o mapeamento de nomes de colunas não diferencia maiúsculas de minúsculas, não é possível determinar o mapeamento correto desde o dataframe até a tabela.

  • O dataframe do Spark e a tabela do Snowflake podem não ter nomes de colunas em comum. Em teoria, o conector do Spark poderia inserir NULLs em cada coluna de cada linha, mas isso geralmente é inútil, então o conector lança um erro mesmo que o column_mismatch_behavior esteja definido como ignore.

Os valores possíveis deste parâmetro são:

  • error

  • ignore

O valor padrão é error.

time_output_format

Este parâmetro permite que o usuário especifique o formato dos dados TIME retornados.

Os valores possíveis deste parâmetro são os valores possíveis para os formatos de hora especificados em Formatos de hora.

Este parâmetro afeta apenas a saída, não a entrada.

timestamp_ntz_output_format, . timestamp_ltz_output_format, . timestamp_tz_output_format

Estas opções especificam o formato de saída para os valores de carimbo de data/hora. Os valores padrão destas opções são:

Opção de configuração

Valor padrão

timestamp_ntz_output_format

"YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_ltz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_tz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

Se estas opções estiverem definidas em "sf_current", o conector utiliza os formatos especificados para a sessão.

partition_size_in_mb

Este parâmetro é usado quando o conjunto de resultados da consulta é muito grande e precisa ser dividido em múltiplas partições DataFrame. Este parâmetro especifica o tamanho não compactado recomendado para cada partição DataFrame. Para reduzir o número de partições, aumente este tamanho.

Este tamanho é usado como o valor recomendado; o tamanho real das partições pode ser menor ou maior.

Esta opção se aplica somente quando o parâmetro use_copy_unload é FALSE.

Este parâmetro é opcional.

O valor padrão é 100 (MB).

use_copy_unload

Se esse for FALSE, o Snowflake usa o formato de dados Arrow quando SELECTing dados. Se definido como TRUE, então o Snowflake reverte para o comportamento antigo de usar o comando COPY UNLOAD para transmitir dados selecionados.

Este parâmetro é opcional.

O valor padrão é FALSE.

treat_decimal_as_long

Se TRUE, configura o Spark Connector para retornar valores Long (em vez de valores BigDecimal) para consultas que retornam o tipo Decimal(precision, 0).

O valor padrão é FALSE.

Esta opção foi adicionada na versão 2.11.1 do Spark Connector.

s3_stage_vpce_dns_name

Especifica o nome de DNS de seu ponto de extremidade VPC para o acesso aos estágios internos.

Esta opção foi adicionada na versão 2.11.1 do Spark Connector.

support_share_connection

Se FALSE, configura o conector Spark para criar uma nova conexão JDBC para cada trabalho ou ação que usa as mesmas opções do conector Spark para acessar o Snowflake.

O valor padrão é TRUE, o que significa que os diferentes trabalhos e ações compartilham a mesma conexão JDBC se eles usarem as mesmas opções do conector Spark para acessar o Snowflake.

Se você precisar habilitar ou desabilitar essa configuração programaticamente, use as seguintes funções estáticas globais:

  • SparkConnectorContext.disableSharedConnection()

  • SparkConnectorContext.enableSharingJDBCConnection()

Nota

Nos seguintes casos especiais, o conector Spark não usa uma conexão JDBC compartilhada:

  • Se pré-ações ou pós-ações forem definidas e essas pré-ações ou pós-ações não forem CREATE TABLE, DROP TABLE ou MERGE INTO, o conector Spark não usa a conexão compartilhada.

  • Funções de utilitário em Utils, como Utils.runQuery() e Utils.getJDBCConnection() não usam a conexão compartilhada.

Esta opção foi adicionada na versão 2.11.2 do conector Spark.

force_skip_pre_post_action_check_for_shared_session

Se TRUE, configura o conector Spark para desabilitar a validação de pré-ações e pós-ações para compartilhamento de sessão.

O valor padrão é FALSE.

Importante

Antes de definir esta opção, verifique se as consultas em pré-ações e pós-ações não afetam as configurações da sessão. Caso contrário, você pode encontrar problemas com os resultados.

Esta opção foi adicionada na versão 2.11.3 do conector Spark.

Uso de autenticação e rodízio do par de chaves

O conector do Spark é compatível com a autenticação de par de chaves e a rotação de chaves.

  1. Para começar, complete a configuração inicial para autenticação de par de chaves como mostrado em Autenticação de pares de chaves e rotação de pares de chaves.

  2. Envie uma cópia não criptografada da chave privada usando a opção de conexão pem_private_key.

Atenção

Por razões de segurança, em vez de codificar o pem_private_key em seu aplicativo, você deve definir o parâmetro dinamicamente após ler a chave a partir de uma fonte segura. Se a chave estiver criptografada, então descriptografe-a e envie a versão descriptografada.

No exemplo com Python, observe que o arquivo pem_private_key, rsa_key.p8, está:

  • Sendo lido diretamente de um arquivo protegido por senha, usando a variável de ambiente PRIVATE_KEY_PASSPHRASE.

  • Usando a expressão pkb na cadeia de caracteres sfOptions.

Para conectar, você pode salvar o exemplo Python em um arquivo (ou seja, <file.py>) e então executar o seguinte comando:

spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>
Copy

Python

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os

with open("<path>/rsa_key.p8", "rb") as key_file:
  p_key = serialization.load_pem_private_key(
    key_file.read(),
    password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
    backend = default_backend()
    )

pkb = p_key.private_bytes(
  encoding = serialization.Encoding.PEM,
  format = serialization.PrivateFormat.PKCS8,
  encryption_algorithm = serialization.NoEncryption()
  )

pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')

sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "pem_private_key" : pkb,
  "sfDatabase" : "<database>",
  "sfSchema" : "schema",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "COLORS") \
    .load()

df.show()
Copy

Uso de OAuth externo

Começando pela versão do conector do Spark 2.7.0, você pode usar OAuth externo para autenticação no Snowflake usando o programa Scala ou o script Python de exemplo.

Antes de usar o OAuth externo e o conector do Spark para autenticação no Snowflake, configure uma integração de segurança de OAuth externo para um dos servidores de autorização de OAuth externo compatíveis ou para um cliente personalizado de OAuth externo.

Nos exemplos de Scala e Python, observe a substituição do parâmetro sfPassword pelos parâmetros sfAuthenticator e sfToken.

Scala:

// spark connector version

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.DataFrame

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<username>",
    "sfAuthenticator" -> "oauth",
    "sfToken" -> "<external_oauth_access_token>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "region")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Python:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfAuthenticator" : "oauth",
  "sfToken" : "<external_oauth_access_token>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()
Copy

Opções do AWS para transferência de dados externa

Estas opções são usadas para especificar o local do Amazon S3 onde os dados temporários são armazenados e fornecer detalhes de autenticação para acessar o local. Elas são necessárias somente se você estiver fazendo uma transferência de dados externa. As transferências de dados externas são necessárias se uma das seguintes opções for verdadeira:

  • Você está usando a versão 2.1.x ou anterior do conector do Spark (que não aceita transferências internas), ou

  • Sua transferência provavelmente levará 36 horas ou mais (as transferências internas utilizam credenciais temporárias que expiram após 36 horas).

tempDir

O local S3 onde os dados intermediários são armazenados (por exemplo, s3n://xy12345-bucket/spark-snowflake-tmp/).

Se tempDir for especificado, você também deve especificar uma destas opções:

  • awsAccessKey , awsSecretKey . ou

  • temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

awsAccessKey , awsSecretKey

Estas são credenciais AWS padrão que permitem o acesso ao local especificado em tempDir. Observe que ambas as opções devem ser definidas em conjunto.

Se forem definidas, podem ser recuperadas do objeto SparkContext existente.

Se você especificar estas variáveis, também deverá especificar tempDir.

Estas credenciais também devem ser definidas para o cluster Hadoop.

temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

Elas são credenciais AWS temporárias que permitem o acesso ao local especificado em tempDir. Observe que todas essas três opções devem ser definidas em conjunto.

Além disso, se essas opções forem definidas, terão precedência sobre as opções awsAccessKey e awsSecretKey.

Se você especificar temporary_aws_access_key_id , temporary_aws_secret_access_key e temporary_aws_session_token , também deverá especificar tempDir. Caso contrário, estes parâmetros são ignorados.

check_bucket_configuration

Se definido como on (padrão), o conector verifica se o bucket usado para transferência de dados tem uma política de ciclo de vida configurada (consulte Preparação de um bucket AWS S3 externo para mais informações). Se não houver uma política de ciclo de vida presente, um aviso é registrado.

Desabilitar essa opção (definindo-a como off) ignora essa verificação. Isto pode ser útil se um usuário puder acessar as operações de dados do bucket, mas não as políticas de ciclo de vida. Desabilitar a opção também pode acelerar ligeiramente os tempos de execução da consulta.

Para obter mais detalhes, consulte Autenticação do S3 para troca de dados (neste tópico).

Opções do Azure para transferência de dados externa

Esta seção descreve os parâmetros que se aplicam ao armazenamento de blobs do Azure ao fazer transferências de dados externas. As transferências de dados externas são necessárias se uma das seguintes opções for verdadeira:

  • Você está usando a versão 2.1.x ou anterior do conector do Spark (que não aceita transferências internas), ou

  • Sua transferência provavelmente levará 36 horas ou mais (as transferências internas utilizam credenciais temporárias que expiram após 36 horas).

Ao utilizar uma transferência externa com armazenamento de blobs do Azure, você especifica a localização do contêiner do Azure e a SAS (assinatura de acesso compartilhado) para esse contêiner usando os parâmetros descritos abaixo.

tempDir

O contêiner de armazenamento de blobs do Azure onde são armazenados os dados intermediários. Ele tem a forma de um URL, por exemplo:

wasb://<contêiner_do_azure>@<conta_do_azure>.<ponto_de_extremidade_do_azure>/

temporary_azure_sas_token

Especifique o token SAS para o armazenamento de blobs do Azure.

Para obter mais detalhes, consulte Autenticação do Azure para troca de dados (neste tópico).

Especificação de informações do Azure para armazenamento temporário no Spark

Ao utilizar o armazenamento de blobs do Azure para fornecer armazenamento temporário para transferência de dados entre o Spark e o Snowflake, você deve fornecer ao Spark, assim como ao conector Spark do Snowflake, o local e as credenciais para o armazenamento temporário.

Para fornecer ao Spark o local de armazenamento temporário, execute comandos semelhantes aos seguintes em seu cluster do Spark:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Copy

Observe que o último comando contém as seguintes variáveis:

  • <contêiner> e <conta>: são os nomes do contêiner e da conta para sua implantação do Azure.

  • <ponto_de_extremidade_do_azure>: o ponto de extremidade de seu local de implantação do Azure. Por exemplo, se você estiver usando uma implantação US do Azure, é provável que o ponto de extremidade seja blob.core.windows.net.

  • <azure_sas>: o token de segurança de Assinatura de Acesso Compartilhado.

Substitua cada uma dessas variáveis pelas informações adequadas para sua conta de armazenamento de blobs do Azure.

Passagem de parâmetros da sessão do Snowflake como opções para o conector

O conector do Snowflake para Spark aceita o envio de parâmetros arbitrários em nível de sessão ao Snowflake (consulte Parâmetros de sessão para mais informações). Isso pode ser feito adicionando-se um par ("<chave>" -> "<valor>") ao objeto options, onde <chave> é o nome do parâmetro da sessão e <valor> é o valor.

Nota

O <valor> deve ser uma cadeia de caracteres entre aspas duplas, mesmo para parâmetros que aceitam números ou valores booleanos (por exemplo, "1" ou "true").

Por exemplo, o código de exemplo a seguir passa o parâmetro de sessão USE_CACHED_RESULT com um valor de "false", que desabilita o uso dos resultados de consultas executadas anteriormente:

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method
Copy

Considerações de segurança

Os clientes devem garantir que em um sistema Spark com múltiplos nós as comunicações entre os nós sejam seguras. O mestre do Spark envia credenciais do Snowflake aos nós de trabalho do Spark para que eles possam acessar os estágios do Snowflake. Se as comunicações entre o mestre e os nós de trabalho do Spark não forem seguras, as credenciais poderão ser lidas por um terceiro não autorizado.

Autenticação do S3 para troca de dados

Esta seção descreve como autenticar ao usar o S3 para troca de dados.

Esta tarefa é necessária somente em uma das seguintes circunstâncias:

  • A versão do conector do Snowflake para Spark é 2.1.x (ou anterior). A partir da versão v2.2.0, o conector utiliza um estágio temporário interno do Snowflake para a troca de dados. Se você não estiver usando a versão 2.2.0 (ou superior) do conector, a Snowflake recomenda fortemente a atualização para a versão mais recente.

  • A versão do conector do Snowflake para Spark é 2.2.0 (ou superior), mas seus trabalhos costumam exceder 36 horas. Esta é a duração máxima do token AWS utilizado pelo conector para acessar o estágio interno para troca de dados.

Se estiver usando uma versão mais antiga do conector, você precisa preparar um local S3 que o conector possa usar para trocar dados entre o Snowflake e o Spark.

Para permitir o acesso ao bucket/diretório S3 usado para trocar dados entre o Spark e o Snowflake (como especificado para tempDir), dois métodos de autenticação são aceitos:

  • Credenciais permanentes do AWS (também usadas para configurar a autenticação Hadoop/Spark para acessar o S3)

  • Credenciais temporárias do AWS

Uso de credenciais permanentes do AWS

Este é o método de autenticação padrão do AWS. Ele requer um par de valores awsAccessKey e awsSecretKey.

Nota

Esses valores também precisam ser usados para configurar o Hadoop/Spark para acessar o S3. Para obter mais informações, incluindo exemplos, consulte Autenticação no Hadoop/Spark usando S3A ou S3N (neste tópico).

Por exemplo:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)
Copy

Para detalhes sobre as opções aceitas por sfOptions, consulte Opções do AWS para transferência de dados externa (neste tópico).

Autenticação no Hadoop/Spark usando S3A ou S3N

Ecossistemas Hadoop/Spark aceitam 2 esquemas URI para acesso ao S3:

s3a://

Novo método recomendado (para Hadoop 2.7 e superior)

Para usar este método, modifique os exemplos do Scala neste tópico para adicionar as seguintes opções de configuração do Hadoop:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)
Copy

Certifique-se de que a opção tempdir utilize também s3a://.

s3n://

Método mais antigo (para Hadoop 2.6 e anterior)

Em alguns sistemas, é necessário especificá-lo explicitamente como mostrado no exemplo do Scala a seguir:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)
Copy

Uso de credenciais temporárias do AWS

Este método usa as opções de configuração temporary_aws_access_key_id, temporary_aws_secret_access_key e temporary_aws_session_token para o conector.

Este método oferece segurança adicional fornecendo ao Snowflake apenas acesso temporário ao bucket/diretório S3 utilizado para troca de dados.

Nota

As credenciais temporárias só podem ser usadas para configurar a autenticação S3 para o conector; elas não podem ser usadas para configurar a autenticação Hadoop/Spark.

Além disso, se você fornecer credenciais temporárias, elas têm precedência sobre quaisquer credenciais permanentes que tenham sido fornecidas.

O código do Scala a seguir fornece um exemplo de autenticação usando credenciais temporárias:

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())
Copy

sfOptions2 pode agora ser usado com o método de DataFrame options().

Autenticação do Azure para troca de dados

Esta seção descreve como autenticar ao utilizar o armazenamento de blobs do Azure para troca de dados.

A autenticação desta forma é necessária somente em uma das seguintes circunstâncias:

  • A versão do conector do Snowflake para Spark é 2.1.x (ou anterior). A partir da versão v2.2.0, o conector utiliza um estágio temporário interno do Snowflake para a troca de dados. Se você não estiver usando a versão 2.2.0 (ou superior) do conector, a Snowflake recomenda fortemente a atualização para a versão mais recente.

  • A versão do conector do Snowflake para Spark é 2.2.0 (ou superior), mas seus trabalhos costumam exceder 36 horas. Esta é a duração máxima do token Azure utilizado pelo conector para acessar o estágio interno de troca de dados.

Você precisa preparar um contêiner de armazenamento de blobs do Azure que o conector possa usar para trocar dados entre o Snowflake e o Spark.

Uso de credenciais do Azure

Este é o método de autenticação padrão do armazenamento de blobs do Azure. Ele requer um par de valores: tempDir (um URL) e valores temporary_azure_sas_token.

Nota

Estes valores também precisam ser usados para configurar o Hadoop/Spark para acessar o armazenamento de blobs do Azure. Para obter mais informações, incluindo exemplos, consulte Autenticação no Hadoop/Spark usando Azure (neste tópico).

Por exemplo:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

// Then, configure your Snowflake environment
//
val sfOptions = Map(
  "sfURL" -> "<account_identifier>.snowflakecomputing.com",
  "sfUser" -> "<user_name>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database_name>",
  "sfSchema" -> "<schema_name>",
  "sfWarehouse" -> "<warehouse_name>",
  "sfCompress" -> "on",
  "sfSSL" -> "on",
  "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/",
  "temporary_azure_sas_token" -> "<azure_sas>"
)
Copy

Para detalhes sobre as opções aceitas por sfOptions, consulte Opções do Azure para transferência de dados externa (neste tópico).

Autenticação no Hadoop/Spark usando Azure

Para usar este método, modifique os exemplos do Scala neste tópico para adicionar as seguintes opções de configuração do Hadoop:

val hadoopConf = sc.hadoopConfiguration
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Copy

Certifique-se de que a opção tempdir utilize também wasb://.

A autenticação através de um navegador não é aceita

Ao usar o conector do Spark, é impraticável usar qualquer forma de autenticação que abra uma janela do navegador para pedir credenciais ao usuário. A janela não apareceria necessariamente na máquina do cliente. Portanto, o conector do Spark não aceita nenhum tipo de autenticação, incluindo MFA (autenticação multifator) ou SSO (login único), que chame uma janela do navegador.