Utilisation du connecteur Spark¶
Le connecteur adhère à l’API Spark standard, mais avec deux options supplémentaires spécifiques à Snowflake décrites dans ce chapitre.
Dans ce chapitre, le terme COPY se réfère à :
COPY INTO <table> (utilisé pour transférer des données d’une zone de préparation interne ou externe vers une table).
COPY INTO <emplacement> (utilisé pour transférer des données d’une table vers une zone de préparation interne ou externe).
Dans ce chapitre :
Vérification de la connexion réseau à Snowflake avec SnowCD¶
Après avoir configuré votre pilote, vous pouvez évaluer et dépanner votre connectivité réseau à Snowflake en utilisant SnowCD.
Vous pouvez utiliser SnowCD pendant le processus de configuration initiale et à la demande à tout moment pour évaluer et dépanner votre connexion réseau à Snowflake.
Pushdown¶
Le connecteur Spark applique un pushdown de prédicat et de requête en capturant et en analysant les plans logiques Spark pour les opérations SQL. Lorsque la source de données est Snowflake, les opérations sont traduites en une requête SQL puis exécutées dans Snowflake pour améliorer les performances.
Cependant, comme cette traduction nécessite presque de traduire un par un les opérateurs SQL Spark en expressions Snowflake, tous les opérateurs Spark SQL ne peuvent pas être push down. Lorsque le pushdown échoue, le connecteur revient à un plan d’exécution moins optimisé. Les opérations non prises en charge sont à la place exécutées dans Spark.
Note
Si vous avez besoin de la fonction pushdown pour toutes les opérations, pensez à écrire votre code pour utiliser API Snowpark à la place.
Vous trouverez ci-dessous une liste des opérations prises en charge pour le pushdown (toutes les fonctions ci-dessous utilisent leurs noms Spark). Si une fonction ne figure pas dans cette liste, un plan Spark qui l’utilise peut être exécuté sur Spark plutôt que push down à Snowflake.
Fonctions d’agrégation
Average
Corr
CovPopulation
CovSample
Count
Max
Min
StddevPop
StddevSamp
Sum
VariancePop
VarianceSamp
Opérateurs booléens
And
Between
Contains
EndsWith
EqualTo
GreaterThan
GreaterThanOrEqual
In
IsNull
IsNotNull
LessThan
LessThanOrEqual
Not
Or
StartsWith
Fonctions de date, d’heure et d’horodatage
DateAdd
DateSub
Mois
Trimestre
TruncDate
TruncTimestamp
Année
Fonctions mathématiques
Opérateurs arithmétiques « + » (addition), « - » (soustraction), « * » (multiplication), « / » (division) et « - » (négation unaire).
Abs
Acos
Asin
Atan
Ceil
CheckOverflow
Cos
Cosh
Exp
Floor
Greatest
Least
Log
Pi
Pow
PromotePrecision
Rand
Round
Sin
Sinh
Sqrt
Tan
Tanh
Opérateurs divers
Alias (expressions AS)
BitwiseAnd
BitwiseNot
BitwiseOr
BitwiseXor
CaseWhen
Cast(enfant, t, _)
Coalesce
If
MakeDecimal
ScalarSubquery
ShiftLeft
ShiftRight
SortOrder
UnscaledValue
Opérateurs relationnels
Fonctions d’agrégation et clauses group-by
Distinct
Filters
In
InSet
Joins
Limits
Projections
Sorts (ORDER BY)
Union and Union All
Fonctions de fenêtre et clauses de fenêtrage
Fonctions de chaîne de caractères
Ascii
Concat(enfants)
Longueur
Like
Lower
StringLPad
StringRPad
StringTranslate
StringTrim
StringTrimLeft
StringTrimRight
Substring
Upper
Fonctions de fenêtre (remarque : elles ne fonctionnent pas avec Spark 2.2)
DenseRank
Rang
RowNumber
Utilisation du connecteur dans Scala¶
Spécification du nom de classe de source de données¶
Pour utiliser Snowflake comme source de données dans Spark, utilisez l’option .format
pour fournir le nom de classe du connecteur Snowflake qui définit la source de données.
net.snowflake.spark.snowflake
Pour assurer une vérification du nom de la classe lors de la compilation, Snowflake recommande fortement de définir une variable pour le nom de classe. Par exemple :
val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
De plus, par souci de facilité, la classe Utils
fournit la variable qui peut être importée comme suit :
import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
Note
Tous les exemples de ce chapitre utilisent SNOWFLAKE_SOURCE_NAME
comme définition de classe.
Activation/désactivation du pushdown dans une session¶
La version 2.1.0 (et supérieure) du connecteur prend en charge le pushdown de requêtes, ce qui peut améliorer considérablement les performances en poussant le traitement des requêtes vers Snowflake lorsque Snowflake est la source de données Spark.
Par défaut, le pushdown est activé.
Pour désactiver le pushdown dans une session Spark pour un DataFrame donné :
Après avoir instancié un objet
SparkSession
, appelez la méthode statiqueSnowflakeConnectorUtils.disablePushdownSession
en transmettant l’objetSparkSession
. Par exemple :SnowflakeConnectorUtils.disablePushdownSession(spark)
Où
spark
est votre objetSparkSession
.Créez un DataFrame avec l’option autopushdown définie sur
off
. Par exemple :val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", query) .option("autopushdown", "off") .load()
Notez que vous pouvez également définir l’option
autopushdown
dans unMap
que vous transmettez à la méthodeoptions
(par exemple, danssfOptions
dans l’exemple ci-dessus).
Pour réactiver le pushdown après l’avoir désactivé, appelez la méthode statique SnowflakeConnectorUtils.enablePushdownSession
(en transmettant l’objet SparkSession
), et créez un DataFrame avec autopushdown
activé.
Transfert de données depuis Snowflake vers Spark¶
Note
Lorsque vous utilisez des DataFrames, le connecteur Snowflake ne prend en charge que les requêtes SELECT.
Pour lire des données de Snowflake vers un DataFrame Spark :
Utilisez la méthode
read()
de l’objetSqlContext
pour construire unDataFrameReader
.Spécifiez
SNOWFLAKE_SOURCE_NAME
en utilisant la méthodeformat()
. Pour la définition, voir Spécification du nom de classe de source de données (dans ce chapitre).Spécifiez les options du connecteur en utilisant la méthode
option()
ouoptions()
. Pour plus d’informations, voir Réglage des options de configuration du connecteur (dans ce chapitre).Spécifiez l’une des options suivantes pour les données de table à lire :
dbtable
: le nom de la table à lire. Toutes les colonnes et tous les enregistrements sont récupérés (c’est-à-dire que cela est équivalent àSELECT * FROM db_table
).query
: la requête exacte (instruction SELECT) à exécuter.
Notes sur l’utilisation¶
Actuellement, le connecteur ne prend pas en charge d’autres types de requête (par ex. les instructions SHOW, DESC ou DML) en cas d’utilisation de DataFrames.
Il y a une limite supérieure à la taille d’une ligne individuelle. Pour plus de détails, voir Limites de la taille du texte de requête.
Remarques sur les performances¶
Lorsque vous transférez des données entre Snowflake et Spark, utilisez les méthodes suivantes pour analyser/améliorer les performances :
Utilisez la méthode
net.snowflake.spark.snowflake.Utils.getLastSelect()
pour voir la requête actuelle émise lors du transfert de données depuis Snowflake vers Spark.Si vous utilisez la fonctionnalité
filter
ouwhere
du DataFrame Spark, vérifiez que les filtres respectifs sont présents dans la requête SQL émise. Le connecteur Snowflake essaie de traduire tous les filtres demandés par Spark en SQL.Cependant, il existe des formes de filtre que l’infrastructure Spark ne transmet pas aujourd’hui au connecteur Snowflake. Par conséquent, dans certaines situations, un grand nombre d’enregistrements inutiles sont demandés à Snowflake.
Si vous n’avez besoin que d’un sous-ensemble de colonnes, assurez-vous qu’il reflète le sous-ensemble dans la requête SQL.
En général, si la requête SQL émise ne correspond pas à ce que vous attendez sur la base des opérations de DataFrame, utilisez l’option
query
pour fournir la syntaxe SQL exacte que vous souhaitez.
Exemples¶
Lire une table entière :
val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t1") .load()
Lire les résultats d’une requête :
val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1") .load()
Transfert de données de Spark à Snowflake¶
Les étapes pour enregistrer le contenu d’un DataFrame dans une table Snowflake sont similaires à celles servant à effectuer une écriture depuis Snowflake vers Spark :
Utilisez la méthode
write()
duDataFrame
pour construire unDataFrameWriter
.Spécifiez
SNOWFLAKE_SOURCE_NAME
en utilisant la méthodeformat()
. Pour la définition, voir Spécification du nom de classe de source de données (dans ce chapitre).Spécifiez les options du connecteur en utilisant la méthode
option()
ouoptions()
. Pour plus d’informations, voir Réglage des options de configuration du connecteur (dans ce chapitre).Utilisez l’option
dbtable
pour spécifier la table dans laquelle les données sont écrites.Utilisez la méthode
mode()
pour spécifier le mode de sauvegarde du contenu.Pour plus d’informations, voir SaveMode (documentation Spark).
Exemples¶
df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t2") .mode(SaveMode.Overwrite) .save()
Exportation de JSON de Spark vers Snowflake¶
Les DataFrames Spark peuvent contenir des objets JSON sérialisés en tant que chaînes. Le code suivant fournit un exemple de conversion d’un DataFrame classique en DataFrame contenant des données JSON :
val rdd = myDataFrame.toJSON val schema = new StructType(Array(StructField("JSON", StringType))) val jsonDataFrame = sqlContext.createDataFrame( rdd.map(s => Row(s)), schema)
Notez que le résultat jsonDataFrame
contient une seule colonne de type StringType
. Par conséquent, lorsque ce DataFrame est exporté vers Snowflake avec le mode commun SaveMode.Overwrite
, une nouvelle table est créée dans Snowflake avec une seule colonne de type VARCHAR
.
Pour charger jsonDataFrame
dans une colonne VARIANT
:
Créez une table Snowflake (connectée à Snowflake dans Java à l’aide du pilote JDBC Snowflake). Pour des explications sur les paramètres de connexion utilisés dans l’exemple, voir Référence Paramètre de connexion pilote JDBC.
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; public class SnowflakeJDBCExample { public static void main(String[] args) throws Exception { String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.com/"; Properties properties = new Properties(); properties.put("user", "peter"); properties.put("password", "test"); properties.put("account", "myorganization-myaccount"); properties.put("warehouse", "mywh"); properties.put("db", "mydb"); properties.put("schema", "public"); // get connection System.out.println("Create JDBC connection"); Connection connection = DriverManager.getConnection(jdbcUrl, properties); System.out.println("Done creating JDBC connection\n"); // create statement System.out.println("Create JDBC statement"); Statement statement = connection.createStatement(); System.out.println("Done creating JDBC statement\n"); // create a table System.out.println("Create my_variant_table table"); statement.executeUpdate("create or replace table my_variant_table(json VARIANT)"); statement.close(); System.out.println("Done creating demo table\n"); connection.close(); System.out.println("Close connection\n"); } }
Au lieu d’utiliser
SaveMode.Overwrite
, utilisezSaveMode.Append
pour réutiliser la table existante. Lorsque la valeur de chaîne représentant JSON est chargée dans Snowflake car la colonne cible est de type VARIANT, elle est analysée comme JSON. Par exemple :df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "my_variant_table") .mode(SaveMode.Append) .save()
Exécution d’instructions DDL/DML SQL¶
Utilisez la méthode runQuery()
de l’objet Utils
pour exécuter les instructions DDL/DML SQL, en plus des requêtes, par exemple :
var sfOptions = Map(
"sfURL" -> "<account_identifier>.snowflakecomputing.com",
"sfUser" -> "<user_name>",
"sfPassword" -> "<password>",
"sfDatabase" -> "<database>",
"sfSchema" -> "<schema>",
"sfWarehouse" -> "<warehouse>"
)
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")
où sfOptions
est la carte des paramètres utilisée pour lire/écrire des DataFrames.
La méthode runQuery
ne renvoie que TRUE ou FALSE. Elle est destinée aux instructions qui ne renvoient pas de jeu de résultats, par exemple les instructions DDL telles que CREATE TABLE
et DML telles que INSERT
, UPDATE
et DELETE
. Cela n’est pas utile pour les instructions qui renvoient un jeu de résultats, tel que SELECT
ou SHOW
.
Utilisation d’horodatages et de fuseaux horaires¶
Spark ne fournit qu’un seul type d’horodatage équivalent au type d’horodatage Scala/Java. Son comportement est presque identique à celui du type de données TIMESTAMP_LTZ (fuseau horaire local) dans Snowflake. Ainsi, lorsque vous transférez des données entre Spark et Snowflake, nous vous recommandons d’utiliser les approches suivantes pour conserver l’heure correctement, par rapport aux fuseaux horaires :
Utilisez uniquement le type de données TIMESTAMP_LTZ dans Snowflake.
Note
Le mapping par défaut de type de données d’horodatage est TIMESTAMP_NTZ (aucun fuseau horaire). Vous devez donc définir explicitement le paramètre TIMESTAMP_TYPE_MAPPING pour utiliser TIMESTAMP_LTZ.
Réglez le fuseau horaire Spark sur
UTC
et utilisez ce fuseau horaire dans Snowflake (c.-à-d. ne réglez pas l’optionsfTimezone
pour le connecteur, et ne définissez pas explicitement un fuseau horaire dans Snowflake). Dans ce scénario, TIMESTAMP_LTZ et TIMESTAMP_NTZ sont effectivement équivalents.Pour régler le fuseau horaire, ajoutez la ligne suivante à votre code Spark :
java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
Si vous ne mettez pas en œuvre l’une de ces méthodes, des modifications non souhaitées peuvent survenir. Par exemple, considérons le scénario suivant :
Le fuseau horaire dans Spark est réglé sur
America/New_York
.Le fuseau horaire dans Snowflake est réglé sur
Europe/Warsaw
, ce qui peut arriver :En réglant
sfTimezone
surEurope/Warsaw
pour le connecteur.En réglant
sfTimezone
sursnowflake
pour le connecteur et en définissant le paramètre de session TIMEZONE dans Snowflake surEurope/Warsaw
.
TIMESTAMP_NTZ et TIMESTAMP_LTZ sont tous deux utilisés dans Snowflake.
Dans ce scénario :
Si une valeur représentant
12:00:00
dans une colonne TIMESTAMP_NTZ dans Snowflake est envoyée à Spark, cette valeur ne contient aucune information de fuseau horaire. Spark traite la valeur comme12:00:00
à New York.Si Spark renvoie cette valeur
12:00:00
(à New York) à Snowflake pour être chargée dans une colonne TIMESTAMP_LTZ , elle est automatiquement convertie et chargée comme18:00:00
(pour le fuseau horaire de Varsovie).Si cette valeur est ensuite convertie en TIMESTAMP_NTZ dans Snowflake, l’utilisateur voit
18:00:00
, qui est différent de la valeur originale,12:00:00
.
Pour résumer, Snowflake recommande de suivre strictement au moins une de ces règles :
Utilisez le même fuseau horaire, de préférence
UTC
pour Snowflake et Spark.Utilisez seulement le type de données TIMESTAMP_LTZ pour transférer des données entre Spark et Snowflake.
Exemple de programme Scala¶
Important
Cet exemple de programme suppose que vous utilisez la version 2.2.0 (ou supérieure) du connecteur qui utilise une zone de préparation interne Snowflake pour stocker des données temporaires, et qui n’a donc pas besoin d’un emplacement S3 pour stocker des données temporaires. Si vous utilisez une version antérieure, vous devez avoir un emplacement S3 existant et inclure des valeurs pour tempdir
, awsAccessKey
, awsSecretKey
pour sfOptions
. Pour plus de détails, voir Options AWS pour le transfert de données externe (dans ce chapitre).
Le programme Scala suivant fournit un cas d’utilisation complet pour le connecteur Snowflake pour Spark. Avant d’utiliser le code, remplacez les chaînes suivantes par les valeurs correspondantes, comme décrit dans Réglage des options de configuration du connecteur (dans ce chapitre) :
<identificateur_de_compte>
: votre identificateur de compte.<nom_utilisateur>
,<mot de passe>
: identifiants de connexion pour l’utilisateur Snowflake.<base_de_données>
,<schéma>
,<entrepôt>
: valeurs par défaut de la session Snowflake.
L’exemple de programme Scala utilise l’authentification de base (c’est-à-dire le nom d’utilisateur et le mot de passe). Si vous souhaitez vous authentifier avec OAuth, consultez Utilisation de l’OAuth externe (dans ce chapitre).
import org.apache.spark.sql._
//
// Configure your Snowflake environment
//
var sfOptions = Map(
"sfURL" -> "<account_identifier>.snowflakecomputing.com",
"sfUser" -> "<user_name>",
"sfPassword" -> "<password>",
"sfDatabase" -> "<database>",
"sfSchema" -> "<schema>",
"sfWarehouse" -> "<warehouse>"
)
//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t1")
.load()
//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("query", "select c1, count(*) from t1 group by c1")
.load()
//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t2")
.mode(SaveMode.Overwrite)
.save()
Utilisation du connecteur avec Python¶
L’utilisation du connecteur avec Python est très similaire à l’utilisation de Scala.
Nous recommandons d’utiliser le script bin/pyspark
inclus dans la distribution Spark.
Configuration du script pyspark
¶
Le script pyspark
doit être configuré de la même manière que le script spark-shell
, en utilisant les options --packages
ou --jars
. Par exemple :
bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3
N’oubliez pas d’inclure les fichiers .jar du connecteur Snowflake Spark et du connecteur JDBC dans votre variable d’environnement CLASSPATH.
Pour plus d’informations sur la configuration du script spark-shell
, voir Étape 4 : Configuration du cluster local Spark ou de l’environnement Spark hébergé sur Amazon EMR.
Activation/désactivation du pushdown dans une session¶
La version 2.1.0 (et supérieure) du connecteur prend en charge le pushdown de requêtes, ce qui peut améliorer considérablement les performances en poussant le traitement des requêtes vers Snowflake lorsque Snowflake est la source de données Spark.
Par défaut, le pushdown est activé.
Pour désactiver le pushdown dans une session Spark pour un DataFrame donné :
Après avoir instancié un objet
SparkSession
, appelez la méthode statiqueSnowflakeConnectorUtils.disablePushdownSession
en transmettant l’objetSparkSession
. Par exemple :sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
Créez un DataFrame avec l’option autopushdown définie sur
off
. Par exemple :df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", query) \ .option("autopushdown", "off") \ .load()
Notez que vous pouvez également définir l’option
autopushdown
dans unDictionary
que vous transmettez à la méthodeoptions
(par exemple, danssfOptions
dans l’exemple ci-dessus).
Pour réactiver le pushdown après l’avoir désactivé, appelez la méthode statique SnowflakeConnectorUtils.enablePushdownSession
(en transmettant l’objet SparkSession
), et créez un DataFrame avec autopushdown
activé.
Exemple de script Python¶
Important
Cet exemple de script suppose que vous utilisez la version 2.2.0 (ou supérieure) du connecteur qui utilise une zone de préparation interne Snowflake pour stocker des données temporaires, et qui n’a donc pas besoin d’un emplacement S3 pour stocker ces données. Si vous utilisez une version antérieure, vous devez avoir un emplacement S3 existant et inclure des valeurs pour tempdir
, awsAccessKey
, awsSecretKey
pour sfOptions
. Pour plus de détails, voir Options AWS pour le transfert de données externe (dans ce chapitre).
Une fois le script pyspark
configuré, vous pouvez effectuer des requêtes SQL et d’autres opérations. Voici un exemple de script Python qui effectue une simple requête SQL. Ce script illustre l’utilisation de base des connecteurs. La plupart des exemples Scala de ce document peuvent être adaptés avec un minimum d’efforts/de modifications pour une utilisation avec Python.
L’exemple de script Python utilise l’authentification de base (c’est-à-dire le nom d’utilisateur et le mot de passe). Si vous souhaitez vous authentifier avec OAuth, consultez Utilisation de l’OAuth externe (dans ce chapitre).
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')
# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")
# Set options below
sfOptions = {
"sfURL" : "<account_identifier>.snowflakecomputing.com",
"sfUser" : "<user_name>",
"sfPassword" : "<password>",
"sfDatabase" : "<database>",
"sfSchema" : "<schema>",
"sfWarehouse" : "<warehouse>"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "select 1 as my_num union all select 2 as my_num") \
.load()
df.show()
Astuce
Notez l’utilisation de sfOptions
et SNOWFLAKE_SOURCE_NAME
. Cela simplifie le code et réduit les risques d’erreur.
Pour plus de détails sur les options prises en charge pour sfOptions
, voir Réglage des options de configuration du connecteur (dans ce chapitre).
Mappages de type de données¶
Le connecteur Spark prend en charge la conversion entre de nombreux types de données classiques.
De SQL Spark vers Snowflake¶
Type de données Spark
Type de données Snowflake
ArrayType
VARIANT
BinaryType
Non pris en charge
BooleanType
BOOLEAN
ByteType
INTEGER. Snowflake ne prend pas en charge le type BYTE.
DateType
DATE
DecimalType
DECIMAL
DoubleType
DOUBLE
FloatType
FLOAT
IntegerType
INTEGER
LongType
INTEGER
MapType
VARIANT
ShortType
INTEGER
StringType
Si une longueur est spécifiée, VARCHAR(N) ; autrement, VARCHAR
StructType
VARIANT
TimestampType
TIMESTAMP
De Snowflake vers SQL Spark.¶
Type de données Snowflake
Type de données Spark
ARRAY
StringType
BIGINT
DecimalType(38, 0)
BINARY
Non pris en charge
BLOB
Non pris en charge
BOOLEAN
BooleanType
CHAR
StringType
CLOB
StringType
DATE
DateType
DECIMAL
DecimalType
DOUBLE
DoubleType
FLOAT
DoubleType
INTEGER
DecimalType(38, 0)
OBJECT
StringType
TIMESTAMP
TimestampType
TIME
StringType
(Version du connecteur Spark 2.4.14 ou ultérieure)VARIANT
StringType
Appel de la méthode DataFrame.show¶
Si vous appelez la méthode DataFrame.show
et transmettez un nombre inférieur au nombre de lignes dans le DataFrame, construisez un DataFrame qui contient uniquement les lignes à afficher dans un ordre trié.
Pour ce faire :
Appelez d’abord la méthode
sort
pour renvoyer un DataFrame qui contient des lignes triées.Appelez la méthode
limit
sur ce DataFrame pour renvoyer un DataFrame qui ne contient que les lignes que vous voulez afficher.Appelez la méthode
show
sur le DataFrame retourné.
Par exemple, si vous voulez afficher 5 lignes et que vous voulez que les résultats soient triés par la colonne my_col
:
val dfWithRowsToShow = originalDf.sort("my_col").limit(5) dfWithRowsToShow.show(5)
Sinon, si vous appelez show
pour afficher un sous-ensemble de lignes dans DataFrame, différentes exécutions du code pourraient entraîner l’affichage de différentes lignes.
Réglage des options de configuration du connecteur¶
Les sections suivantes énumèrent les options que vous définissez pour configurer le comportement du connecteur :
Pour définir ces options, appelez la méthode .option(<key>, <valeur>)
ou .options(<map>)
de la classe Spark DataframeReader.
Astuce
Pour faciliter l’utilisation des options, Snowflake recommande de spécifier les options dans un seul objet Map
et d’appeler .options(<map>)
pour définir les options.
Options de connexion requises¶
Les options suivantes sont requises pour la connexion à Snowflake :
sfUrl
Spécifie le nom d’hôte de votre compte au format suivant :
account_identifier.snowflakecomputing.com
account_identifier
est votre identificateur de compte.sfUser
Nom de connexion de l’utilisateur Snowflake.
Vous devez utiliser l’une des options suivantes pour vous authentifier :
sfPassword
Mot de passe de l’utilisateur Snowflake.
pem_private_key
Clé privée (au format PEM) pour l’authentification par paire de clés. Pour obtenir des instructions, voir Authentification par paire de clés et rotation de paires de clés.
sfAuthenticator
Spécifie l’utilisation de External OAuth pour s’authentifier auprès de Snowflake. Définissez la valeur sur
oauth
.L’utilisation de l’OAuth externe nécessite de définir le paramètre
sfToken
.
sfToken
(Nécessaire en cas d’utilisation de External OAuth) Définissez la valeur sur votre jeton d’accès External OAuth.
Le paramètre de connexion nécessite de définir la valeur du paramètre
sfAuthenticator
suroauth
.La valeur par défaut est aucun.
Options de contexte requises¶
Les options suivantes sont nécessaires pour définir le contexte de base de données et de schéma de la session :
sfDatabase
La base de données à utiliser pour la session après la connexion.
sfSchema
Le schéma à utiliser pour la session après la connexion.
Options de contexte supplémentaires¶
Les options énumérées dans cette section ne sont pas obligatoires.
sfAccount
Identificateur du compte (par exemple
myorganization-myaccount
). Cette option n’est plus nécessaire car l’identificateur du compte est spécifié danssfUrl
. Elle n’est documentée ici que pour des raisons de rétrocompatibilité.sfWarehouse
L’entrepôt virtuel par défaut à utiliser pour la session après la connexion.
sfRole
Le rôle de sécurité par défaut à utiliser pour la session après la connexion.
Options de proxy¶
Les options énumérées dans cette section ne sont pas obligatoires.
use_proxy
Indique si le connecteur doit utiliser un proxy :
true
spécifie que le connecteur doit utiliser un proxy.false
spécifie que le connecteur ne doit pas utiliser de proxy.
La valeur par défaut est
false
.proxy_host
(Nécessaire si
use_proxy
esttrue
) Spécifie le nom d’hôte du serveur proxy à utiliser.proxy_port
(Nécessaire si
use_proxy
esttrue
) Spécifie le numéro de port du serveur proxy à utiliser.proxy_protocol
Spécifie le protocole utilisé pour se connecter au serveur proxy. Vous pouvez spécifier l’une des valeurs suivantes :
http
https
La valeur par défaut est
http
.Ceci n’est pris en charge que pour Snowflake sur AWS.
Cette option a été ajoutée dans la version 2.11.1 du Connecteur Spark.
proxy_user
Spécifie le nom d’utilisateur pour l’authentification au serveur proxy. Définissez cette option si le serveur proxy nécessite une authentification.
Ceci n’est pris en charge que pour Snowflake sur AWS.
proxy_password
Spécifie le mot de passe de
proxy_user
pour l’authentification au serveur proxy. Définissez cette option si le serveur proxy nécessite une authentification.Ceci n’est pris en charge que pour Snowflake sur AWS.
non_proxy_hosts
Spécifie la liste des hôtes auxquels le connecteur doit se connecter directement, sans passer par le serveur proxy.
Séparez les noms d’hôtes par un symbole barre verticale avec échappement d’URL(
%7C
). Vous pouvez également utiliser un astérisque (*
) comme caractère générique.Ceci n’est pris en charge que pour Snowflake sur AWS.
Options supplémentaires¶
Les options énumérées dans cette section ne sont pas obligatoires.
sfTimezone
Le fuseau horaire à utiliser par Snowflake en cas d’utilisation de Spark. Notez que le paramètre définit uniquement le fuseau horaire dans Snowflake. L’environnement Spark reste inchangé. Les valeurs prises en charge sont les suivantes :
spark
: utilisez le fuseau horaire de Spark (par défaut).snowflake
: utilisez le fuseau horaire actuel pour Snowflake.sf_default
: utilisez le fuseau horaire par défaut pour l’utilisateur Snowflake qui se connecte.time_zone
: Utilisez un fuseau horaire spécifique (par ex.America/New_York
), si valide.Pour plus d’informations sur les conséquences de cette option, voir Utilisation d’horodatages et de fuseaux horaires (dans ce chapitre).
sfCompress
Si l’option est réglée sur
on
(par défaut), les données transmises entre Snowflake et Spark sont compressées.s3MaxFileSize
La taille du fichier utilisé lors du transfert de données depuis Snowflake vers Spark. La valeur par défaut est 10MB.
preactions
Une liste de commandes SQL séparées par des points-virgules qui sont exécutées avant le transfert des données entre Spark et Snowflake.
Si une commande SQL contient
%s
,%s
est remplacée par le nom de la table référencée pour l’opération.postactions
Une liste de commandes SQL séparées par des points-virgules qui sont exécutées après le transfert des données entre Spark et Snowflake.
Si une commande SQL contient
%s
, elle est remplacée par le nom de la table référencée pour l’opération.truncate_columns
Si l’option est définie sur
on
(par défaut), une commande COPY tronque automatiquement les chaînes de texte dépassant la longueur de la colonne cible. Si l’option est définie suroff
, la commande produit une erreur si une chaîne chargée dépasse la longueur de la colonne cible.truncate_table
Ce paramètre contrôle si le schéma d’une table cible Snowflake est conservé lorsque la table est écrasée.
Par défaut, lorsqu’une table cible dans Snowflake est écrasée, le schéma de cette table cible est également écrasé. Le nouveau schéma est basé sur le schéma de la table source (le dataframe Spark).
Cependant, le schéma de la source n’est parfois pas idéal. Par exemple, un utilisateur peut vouloir qu’une table cible Snowflake puisse stocker des valeurs FLOAT à l’avenir, même si le type de données de la colonne source initiale est INTEGER. Dans ce cas, le schéma de la table Snowflake ne doit pas être écrasé. La table Snowflake doit simplement être tronquée et réutilisée avec son schéma actuel.
Les valeurs possibles de ce paramètre sont :
on
off
Si ce paramètre est
on
, le schéma original de la table cible est conservé. Si ce paramètre estoff
, alors l’ancien schéma de la table est ignoré, et un nouveau schéma est généré sur la base du schéma de la source.Ce paramètre est facultatif.
La valeur par défaut de ce paramètre est
off
(c’est-à-dire que, par défaut, le schéma de la table originale est écrasé).Pour plus de détails sur le mappage de types de données Spark à des types de données Snowflake (et vice versa), voir Mappages de type de données (dans ce chapitre).
continue_on_error
Cette variable contrôle si la commande COPY abandonne si l’utilisateur saisit des données non valides (par exemple, format JSON invalide pour une colonne de type de données variable).
Les valeurs possibles sont :
on
off
La valeur
on
signifie continuer même si une erreur se produit. La valeuroff
signifie annuler si une erreur est détectée.Ce paramètre est facultatif.
La valeur par défaut de ce paramètre est
off
.Il n’est pas recommandé d’activer cette option. Si des erreurs sont détectées lors du processus de COPYing vers Snowflake à l’aide du connecteur Spark, alors certaines données peuvent être absentes.
Note
Si des lignes sont rejetées ou absents, et que ces lignes ne sont pas clairement défectueuses dans la source d’entrée, veuillez le signaler à Snowflake.
usestagingtable
Ce paramètre contrôle si le chargement de données utilise une table de mise en zone de préparation.
Une table d’échelonnement est une table normale (avec un nom temporaire) qui est créée par le connecteur. Si l’opération de chargement des données est réussie, la table cible originale est détruite et la table d’échelonnement est renommée au nom de la table cible d’origine. Si l’opération de chargement des données échoue, la table d’échelonnement est détruite et la table cible se retrouve avec les données qu’elle avait immédiatement avant l’opération. Ainsi, la table d’échelonnement permet de conserver les données de la table cible d’origine en cas d’échec de l’opération. Pour des raisons de sécurité, Snowflake recommande fortement l’utilisation d’une table d’échelonnement dans la plupart des cas.
Pour que le connecteur puisse créer une table d’échelonnement, l’utilisateur exécutant le COPY via le connecteur Spark doit avoir suffisamment de privilèges pour créer une table. Le chargement direct (c’est-à-dire le chargement sans utiliser de table d’échelonnement) est utile si l’utilisateur n’a pas la permission de créer une table.
Les valeurs possibles de ce paramètre sont :
on
off
Si le paramètre est
on
, une table d’échelonnement est utilisée. Si ce paramètre estoff
, les données sont chargées directement dans la table cible.Ce paramètre est facultatif.
La valeur par défaut de ce paramètre est
on
(c.-à-d., utiliser une table de mise en zone de préparation).
autopushdown
Ce paramètre contrôle si le pushdown automatique de requêtes est activé.
Si le pushdown est activé, alors, lorsqu’une requête est exécutée sur Spark, si une partie de la requête peut être « poussée vers le bas » vers le serveur Snowflake, elle est alors poussée vers le bas. Ceci améliore les performances de certaines requêtes.
Ce paramètre est facultatif.
La valeur par défaut est
on
si le connecteur est connecté à une version compatible de Spark. Sinon, la valeur par défaut estoff
.Si le connecteur est connecté à une version différente de Spark que celle à laquelle il est destiné (par exemple, si la version 3.2 du connecteur est connectée à la version 3.3 de Spark), l’auto-pushdown est désactivé même si ce paramètre est réglé sur
on
.purge
Si cette option est définie sur
on
, le connecteur supprime les fichiers temporaires créés lors du transfert depuis Spark vers Snowflake via un transfert de données externe. Si ce paramètre est réglé suroff
, ces fichiers ne sont pas automatiquement supprimés par le connecteur.La purge ne fonctionne que pour les transferts de Spark vers Snowflake, et non pour les transferts de Snowflake vers Spark.
Les valeurs possibles sont :
on
off
La valeur par défaut est
off
.columnmap
Ce paramètre est utile lorsque vous écrivez des données de Spark dans Snowflake et que les noms de colonne de la table Snowflake ne correspondent pas à ceux de la table Spark. Vous pouvez créer une carte qui indique quelle colonne source Spark correspond à quelle colonne cible Snowflake.
Le paramètre est un littéral de chaîne unique, sous la forme suivante :
"Map(col_2 -> col_b, col_3 -> col_a)"
Par exemple, considérons le scénario suivant :
Supposons qu’un Dataframe nommé
df
dans Spark ait trois colonnes :col_1
,col_2
,col_3
Une table nommée
tb
dans Snowflake a deux colonnes :col_a
,col_b
Vous souhaitez copier les valeurs suivantes :
De
df.col_2
àtb.col_b
.De
df.col_3
àtb.col_a
.
La valeur du paramètre
columnmap
serait :Map(col_2 -> col_b, col_3 -> col_a)
Vous pouvez générer cette valeur en exécutant le code Scala suivant :
Map("col_2"->"col_b","col_3"->"col_a").toString()
La valeur par défaut de ce paramètre est nulle. En d’autres termes, par défaut, les noms de colonne des tables source et cible doivent correspondre.
Ce paramètre n’est utilisé que pour effectuer une écriture depuis Spark vers Snowflake. Il ne s’applique pas pour une écriture depuis Snowflake vers Spark.
keep_column_case
Lors de l’écriture d’une table de Spark dans Snowflake, le connecteur Spark décale par défaut les lettres des noms de colonnes en majuscules, à moins que les noms de colonnes ne soient entre guillemets.
Lors de l’écriture d’une table de Snowflake dans Spark, le connecteur Spark ajoute par défaut des guillemets autour de tout nom de colonne contenant des caractères, à l’exception des lettres majuscules, des caractères de soulignement et des chiffres.
Si vous définissez keep_column_case sur
on
, le connecteur Spark n’effectuera pas ces modifications.Les valeurs possibles sont :
on
off
La valeur par défaut est
off
.column_mapping
Le connecteur doit mapper les colonnes du cadre de données Spark à la table Snowflake. Cela peut être effectué en fonction des noms de colonne (quel que soit leur ordre) ou de leur ordre (c’est-à-dire que la première colonne du cadre de données est mappée avec la première colonne de la table, quel que soit le nom de la colonne).
Par défaut, le mappage est effectué en fonction de l’ordre. Vous pouvez remplacer ce paramètre en définissant ce paramètre sur
name
, qui indique au connecteur de mapper les colonnes en fonction de leurs noms. (La correspondance de noms est insensible à la casse.)Les valeurs possibles de ce paramètre sont :
order
name
La valeur par défaut est
order
.column_mismatch_behavior
Ce paramètre ne s’applique que lorsque le paramètre
column_mapping
est défini surname
.Si les noms de colonne dans le cadre de données Spark et la table de Snowflake ne correspondent pas, alors :
Si
column_mismatch_behavior
esterror
, Spark Connector signale une erreur.Si
column_mismatch_behavior
estignore
, Spark Connector ignore l’erreur.Le pilote supprime toute colonne du cadre de données Spark qui n’a pas de colonne correspondante dans la table Snowflake.
Le pilote insère NULLs dans n’importe quelle colonne de la table Snowflake qui n’a pas de colonne correspondante dans le cadre de données Spark.
Les erreurs potentielles incluent :
Le cadre de données Spark peut contenir des colonnes identiques à l’exception de la casse (majuscules/minuscules). Étant donné que le mappage des noms de colonne ne respecte pas la casse, il n’est pas possible de déterminer le mappage correct du cadre de données à la table.
La table Snowflake peut contenir des colonnes identiques à l’exception de la casse (majuscules/minuscules). Étant donné que le mappage des noms de colonne ne respecte pas la casse, il n’est pas possible de déterminer le mappage correct du cadre de données à la table.
Le cadre de données Spark et la table Snowflake peuvent ne pas avoir de noms de colonnes en commun. En théorie, le connecteur Spark pourrait insérer NULLs dans chaque colonne de chaque ligne, mais cela n’a généralement pas de sens. Le connecteur renvoie donc une erreur même si
column_mismatch_behavior
est défini surignore
.
Les valeurs possibles de ce paramètre sont :
error
ignore
La valeur par défaut est
error
.time_output_format
Ce paramètre permet à l’utilisateur de spécifier le format des données
TIME
renvoyées.Les valeurs possibles de ce paramètre sont les valeurs possibles pour les formats d’heure spécifiés à Formats d’heure.
Ce paramètre affecte uniquement la sortie, pas l’entrée.
timestamp_ntz_output_format
, .timestamp_ltz_output_format
, .timestamp_tz_output_format
Ces options spécifient le format de sortie des valeurs d’horodatage. Les valeurs par défaut de ces options sont les suivantes :
Option de configuration
Valeur par défaut
timestamp_ntz_output_format
"YYYY-MM-DD HH24:MI:SS.FF3"
timestamp_ltz_output_format
"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"
timestamp_tz_output_format
"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"
Si ces options ont pour valeur
"sf_current"
, le connecteur utilise les formats spécifiés pour la session.partition_size_in_mb
Ce paramètre est utilisé lorsque l’ensemble de résultats de la requête est très volumineux et doit être divisé en plusieurs partitions DataFrame. Ce paramètre spécifie la taille non compressée recommandée pour chaque partition DataFrame. Pour réduire le nombre de partitions, augmentez cette taille.
Cette taille est utilisée comme taille recommandée, la taille réelle des partitions peut être plus petite ou plus grande.
Cette option s’applique uniquement lorsque le paramètre use_copy_unload est FALSE.
Ce paramètre est facultatif.
La valeur par défaut est
100
(MB).
use_copy_unload
Si la valeur est
FALSE
, Snowflake utilise le format de données Arrow lors de la [sélection] SELECTing de données. Si la valeur estTRUE
, Snowflake revient à l’ancien comportement consistant à utiliser la commandeCOPY UNLOAD
pour transmettre les données sélectionnées.Ce paramètre est facultatif.
La valeur par défaut est
FALSE
.treat_decimal_as_long
Si
TRUE
, configure le connecteur Spark pour retourner des valeursLong
(plutôt que des valeursBigDecimal
) pour les requêtes qui retournent le typeDecimal(precision, 0)
.La valeur par défaut est
FALSE
.Cette option a été ajoutée dans la version 2.11.1 du Connecteur Spark.
s3_stage_vpce_dns_name
Spécifie le nom DNS du point de terminaison de votre VPCpour l’accès aux zones de préparation internes.
Cette option a été ajoutée dans la version 2.11.1 du Connecteur Spark.
support_share_connection
Si
FALSE
, configure le connecteur Spark pour qu’il crée une nouvelle connexion JDBC pour chaque tâche ou action qui utilise les mêmes options du connecteur Spark pour accéder à Snowflake.La valeur par défaut est
TRUE
, ce qui signifie que les différentes tâches et actions partagent la même connexion JDBC s’ils utilisent les mêmes options du connecteur Spark pour accéder à Snowflake.Si vous devez activer ou désactiver ce paramètre par programmation, utilisez les fonctions statiques globales suivantes :
SparkConnectorContext.disableSharedConnection()
SparkConnectorContext.enableSharingJDBCConnection()
Note
Dans les cas particuliers suivants, le connecteur Spark n’utilise pas de connexion JDBC partagée :
Si des préactions ou postactions sont définies et que ces préactions ou postactions ne sont pas CREATE TABLE, DROP TABLE ou MERGE INTO, le connecteur Spark n’utilise pas la connexion partagée.
Les fonctions utilitaires telles que
Utils.runQuery()
etUtils.getJDBCConnection()
n’utilisent pas la connexion partagée.
Cette option a été ajoutée dans la version 2.11.2 du connecteur Spark.
force_skip_pre_post_action_check_for_shared_session
Si
TRUE
, configure le connecteur Spark pour qu’il désactive la validation des préactions et des postactions pour le partage de session.La valeur par défaut est
FALSE
.Important
Avant de définir cette option, assurez-vous que les requêtes dans préactions et postactions n’affectent pas les paramètres de la session. Dans le cas contraire, vous risquez de ne pas obtenir les résultats escomptés.
Cette option a été ajoutée dans la version 2.11.3 du connecteur Spark.
Utilisation de l’authentification par paires de clés et rotation des paires de clés¶
Le connecteur Spark prend en charge l’authentification par paire de clés et la rotation des clés.
Pour commencer, complétez la configuration initiale pour l’authentification de la paire de clés comme indiqué dans Authentification par paire de clés et rotation de paires de clés.
Envoyez une copie non chiffrée de la clé privée à l’aide de l’option de connexion
pem_private_key
.
Attention
Pour des raisons de sécurité, plutôt que de coder en dur la pem_private_key
dans votre application, vous devez définir le paramètre de manière dynamique après avoir lu la clé à partir d’une source sécurisée. Si la clé est chiffrée, déchiffrez-la et envoyez la version déchiffrée.
Dans l’exemple Python, notez que le fichier pem_private_key
, rsa_key.p8
, est :
Est directement lu à partir d’un fichier protégé par mot de passe, à l’aide de la variable d’environnement
PRIVATE_KEY_PASSPHRASE
.Utilise l’expression
pkb
dans la chaînesfOptions
.
Pour vous connecter, vous pouvez enregistrer l’exemple Python dans un fichier (par ex. : <file.py>
) puis exécuter la commande suivante :
spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>
Python
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os
with open("<path>/rsa_key.p8", "rb") as key_file:
p_key = serialization.load_pem_private_key(
key_file.read(),
password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend = default_backend()
)
pkb = p_key.private_bytes(
encoding = serialization.Encoding.PEM,
format = serialization.PrivateFormat.PKCS8,
encryption_algorithm = serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")
sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')
sfOptions = {
"sfURL" : "<account_identifier>.snowflakecomputing.com",
"sfUser" : "<user_name>",
"pem_private_key" : pkb,
"sfDatabase" : "<database>",
"sfSchema" : "schema",
"sfWarehouse" : "<warehouse>"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "COLORS") \
.load()
df.show()
Utilisation de l’OAuth externe¶
À partir de la version du connecteur Spark 2.7.0, vous pouvez utiliser External OAuth pour vous authentifier auprès de Snowflake à l’aide de l’exemple de programme Scala ou de l’exemple de script Python.
Avant d’utiliser OAuth externe et le connecteur Spark pour vous authentifier auprès de Snowflake, configurez une intégration de sécurité OAuth externe pour l’un des serveurs d’autorisation OAuth externe pris en charge ou un client personnalisé External OAuth externe.
Dans les exemples Scala et Python, notez le remplacement du paramètre sfPassword
par les paramètres sfAuthenticator
et sfToken
.
Scala :
// spark connector version val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME import org.apache.spark.sql.DataFrame var sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<username>", "sfAuthenticator" -> "oauth", "sfToken" -> "<external_oauth_access_token>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) // // Create a DataFrame from a Snowflake table // val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "region") .load() // // Join, augment, aggregate, etc. the data in Spark and then use the // Data Source API to write the data back to a table in Snowflake // df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t2") .mode(SaveMode.Overwrite) .save()
Python :
from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * sc = SparkContext("local", "Simple App") spark = SQLContext(sc) spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>') # You might need to set these sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>") sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>") # Set options below sfOptions = { "sfURL" : "<account_identifier>.snowflakecomputing.com", "sfUser" : "<user_name>", "sfAuthenticator" : "oauth", "sfToken" : "<external_oauth_access_token>", "sfDatabase" : "<database>", "sfSchema" : "<schema>", "sfWarehouse" : "<warehouse>" } SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", "select 1 as my_num union all select 2 as my_num") \ .load() df.show()
Options AWS pour le transfert de données externe¶
Ces options sont utilisées pour spécifier l’emplacement Amazon S3 où les données temporaires sont stockées, et fournissent des détails d’authentification pour accéder à cet emplacement. Ils sont uniquement requis si vous effectuez un transfert de données externe. Les transferts de données externes sont nécessaires si l’une des affirmations suivantes est vraie :
Vous utilisez la version 2.1.x ou une version inférieure du connecteur Spark (qui ne prend pas en charge les transferts internes).
Votre transfert prendra probablement 36 heures ou plus (les transferts internes utilisent des informations d’authentification temporaires qui expirent après 36 heures).
tempDir
L’emplacement S3 où sont stockées les données préparées (par exemple
s3n://xy12345-bucket/spark-snowflake-tmp/
).Si
tempDir
est spécifié, vous devez également spécifier :awsAccessKey
,awsSecretKey
. outemporary_aws_access_key_id
,temporary_aws_secret_access_key
,temporary_aws_session_token
awsAccessKey
,awsSecretKey
Il s’agit d’informations d’authentification AWS standard qui permettent d’accéder à l’emplacement spécifié dans
tempDir
. Notez que ces deux options doivent être définies ensemble.Si elles sont définies, elles peuvent être récupérées depuis un objet
SparkContext
existant.Si vous spécifiez ces variables, vous devez également spécifier
tempDir
.Ces informations d’authentification doivent également être définies pour le cluster Hadoop.
temporary_aws_access_key_id
,temporary_aws_secret_access_key
,temporary_aws_session_token
Il s’agit d’informations d’authentification AWS temporaires qui permettent d’accéder à l’emplacement spécifié dans
tempDir
. Notez que ces trois options doivent être définies ensemble.De plus, si ces options sont définies, elles ont priorité sur les options
awsAccessKey
etawsSecretKey
.Si vous spécifiez
temporary_aws_access_key_id
,temporary_aws_secret_access_key
ettemporary_aws_session_token
, vous devez également spécifiertempDir
. Sinon, ces paramètres sont ignorés.check_bucket_configuration
Si le paramètre est réglé sur
on
(par défaut), le connecteur vérifie si le compartiment utilisé pour le transfert de données possède une politique de cycle de vie configurée (voir Préparation d’un compartiment S3 AWS externe pour plus d’informations). S’il n’y a aucune politique de cycle de vie, un avertissement est enregistré.La désactivation de cette option (en la réglant sur
off
) ignore cette vérification. Ceci peut être utile si un utilisateur peut accéder aux opérations de données de compartiment, mais pas aux politiques de cycle de vie du compartiment. La désactivation de cette option peut également accélérer légèrement les temps d’exécution des requêtes.
Pour plus de détails, voir Authentification S3 pour l’échange de données (dans ce chapitre).
Options Azure pour le transfert de données externe¶
Cette section décrit les paramètres qui s’appliquent au stockage Azure Blob lors de transferts de données externes. Les transferts de données externes sont nécessaires si l’une des affirmations suivantes est vraie :
Vous utilisez la version 2.1.x ou une version inférieure du connecteur Spark (qui ne prend pas en charge les transferts internes).
Votre transfert prendra probablement 36 heures ou plus (les transferts internes utilisent des informations d’authentification temporaires qui expirent après 36 heures).
Lorsque vous utilisez un transfert externe avec le stockage Azure Blob, vous spécifiez l’emplacement du conteneur Azure et le SAS (signature d’accès partagé) pour ce conteneur en utilisant les paramètres décrits ci-dessous.
tempDir
Le conteneur de stockage Azure Blob où sont stockées les données intermédiaires. Cela se présente sous la forme d’une URL, par exemple :
wasb://<conteneur_azure>@<compte_azure>.<point_de_terminaison_azure>/
temporary_azure_sas_token
Spécifiez le jeton SAS pour le stockage Azure Blob.
Pour plus de détails, voir Authentification Azure pour l’échange de données (dans ce chapitre).
Spécification des informations Azure pour le stockage temporaire dans Spark¶
Lorsque vous utilisez Azure Blob Storage pour fournir un stockage temporaire afin de transférer des données entre Spark et Snowflake, vous devez fournir à Spark, ainsi qu’au connecteur Snowflake Spark, l’emplacement et les informations d’identification du stockage temporaire.
Pour fournir l’emplacement de stockage temporaire à Spark, exécutez les commandes similaires à celles décrites ci-dessous sur votre cluster Spark :
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Notez que la dernière commande contient les variables suivantes :
<conteneur>
et<compte>
: il s’agit du conteneur et du nom du compte pour votre déploiement Azure.<point_de_terminaison_azure>
: il s’agit du point de terminaison de votre emplacement de déploiement Azure. Par exemple, si vous utilisez un déploiement US Azure, le point de terminaison est probablementblob.core.windows.net
.<azure_sas>
: il s’agit du jeton de sécurité Shared Access Signature.
Remplacez chacune de ces variables par les informations appropriées pour votre compte de stockage Azure Blob.
Transmission de paramètres de session Snowflake en tant qu’options pour le connecteur¶
Le connecteur Snowflake pour Spark prend en charge l’envoi de paramètres de session arbitraires à Snowflake (voir Paramètres de session pour plus d’informations). Ceci peut être réalisé en ajoutant une paire ("<clé>" -> "<valeur>")
à l’objet options
, où <clé>
est le nom de paramètre de session et <valeur>
est la valeur.
Note
La <valeur>
devrait être une chaîne entourée de guillemets doubles, même pour les paramètres qui acceptent les nombres ou les valeurs booléennes (par exemple "1"
ou "true"
).
Par exemple, l’exemple de code suivant transmet le paramètre de session USE_CACHED_RESULT avec une valeur de "false"
qui désactive l’utilisation des résultats des requêtes exécutées précédemment :
// ... assuming sfOptions contains Snowflake connector options
// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")
// ... now use sfOptions with the .options() method
Remarques relatives à la sécurité¶
Les clients doivent s’assurer que dans un système Spark à plusieurs nœuds, les communications entre les nœuds sont sécurisées. Le maître Spark envoie les informations d’identification Snowflake aux employés Spark afin que ces derniers puissent accéder aux zones de préparation Snowflake. Si les communications entre le maître Spark et les employés Spark ne sont pas sécurisées, les informations d’identification peuvent être lues par un tiers non autorisé.
Authentification S3 pour l’échange de données¶
Cette section décrit comment s’authentifier en cas d’utilisation de S3 pour échanger des données.
Cette tâche est seulement nécessaire dans l’une des circonstances suivantes :
La version du connecteur Snowflake pour Spark est 2.1.x (ou inférieure). Depuis la version 2.2.0, le connecteur utilise une zone de préparation interne temporaire Snowflake pour les échanges de données. Si vous n’utilisez pas actuellement la version 2.2.0 (ou supérieure) du connecteur, Snowflake recommande fortement la mise à niveau vers la dernière version.
La version du connecteur Snowflake pour Spark est 2.2.0 (ou supérieure), mais vos opérations dépassent généralement les 36 heures. C’est la durée maximale du jeton AWS utilisé par le connecteur pour accéder à la zone de préparation interne pour l’échange de données.
Si vous utilisez une ancienne version du connecteur, vous devez préparer un emplacement S3 que le connecteur peut utiliser pour échanger des données entre Snowflake et Spark.
Pour permettre l’accès au compartiment/répertoire S3 utilisé pour échanger des données entre Spark et Snowflake (comme spécifié pour tempDir
), deux méthodes d’authentification sont prises en charge :
Informations d’authentification AWS permanentes (également utilisées pour configurer l’authentification Hadoop/Spark pour accéder à S3).
Informations d’authentification AWS temporaires
Utilisation d’informations d’authentification AWS permanentes¶
Il s’agit de la méthode d’authentification AWS standard. Elle nécessite une paire de valeurs awsAccessKey
et awsSecretKey
.
Note
Ces valeurs doivent également être utilisées pour configurer Hadoop/Spark pour accéder à S3. Pour plus d’informations, y compris des exemples, voir Authentification Hadoop/Spark à l’aide de S3A ou S3N (dans ce chapitre).
Par exemple :
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>") // Then, configure your Snowflake environment // var sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<user_name>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>", "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"), "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"), "tempdir" -> "s3n://<temp-bucket-name>" )
Pour plus de détails sur les options prises en charge par sfOptions
, voir Options AWS pour le transfert de données externe (dans ce chapitre).
Authentification Hadoop/Spark à l’aide de S3A ou S3N¶
Les écosystèmes Hadoop/Spark prennent en charge 2 schémas URI pour accéder à S3 :
s3a://
Nouvelle méthode recommandée (pour Hadoop 2.7 et supérieur)
Pour utiliser cette méthode, modifiez les exemples Scala de ce chapitre pour ajouter les options de configuration Hadoop suivantes :
val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3a.access.key", <accessKey>) hadoopConf.set("fs.s3a.secret.key", <secretKey>)
Assurez-vous que l’option
tempdir
utilise égalements3a://
.s3n://
Ancienne méthode (pour Hadoop 2.6 et antérieur)
Dans certains systèmes, il est nécessaire de la spécifier explicitement comme le montre l’exemple Scala suivant :
val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>) hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)
Utilisation d’informations d’authentification AWS temporaires¶
Cette méthode utilise les options de configuration temporary_aws_access_key_id
, temporary_aws_secret_access_key
et temporary_aws_session_token
pour le connecteur.
Cette méthode permet une sécurité supplémentaire en fournissant à Snowflake un accès temporaire au compartiment/répertoire S3 utilisé pour l’échange de données.
Note
Les informations d’authentification temporaires ne peuvent être utilisées que pour configurer l’authentification S3 pour le connecteur. Elles ne peuvent être utilisées pour configurer l’authentification Hadoop/Spark.
De plus, si vous fournissez des informations d’authentification temporaires, elles ont la priorité sur toutes les informations d’authentification permanentes fournies.
L’exemple de code Scala suivant fournit un exemple d’authentification à l’aide d’identifiants temporaires :
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest import net.snowflake.spark.snowflake.Parameters // ... val sts_client = new AWSSecurityTokenServiceClient() val session_token_request = new GetSessionTokenRequest() // Set the token duration to 2 hours. session_token_request.setDurationSeconds(7200) val session_token_result = sts_client.getSessionToken(session_token_request) val session_creds = session_token_result.getCredentials() // Create a new set of Snowflake connector options, based on the existing // sfOptions definition, with additional temporary credential options that override // the credential options in sfOptions. // Note that constants from Parameters are used to guarantee correct // key names, but literal values, such as temporary_aws_access_key_id are, of course, // also allowed. var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId()) sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey()) sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())
sfOptions2
peut maintenant être utilisé avec la méthode options()
DataFrame.
Authentification Azure pour l’échange de données¶
Cette section décrit comment s’authentifier en cas d’utilisation du stockage Azure Blob pour échanger des données.
S’authentifier de cette façon est uniquement nécessaire dans l’une des circonstances suivantes :
La version du connecteur Snowflake pour Spark est 2.1.x (ou inférieure). Depuis la version 2.2.0, le connecteur utilise une zone de préparation interne temporaire Snowflake pour les échanges de données. Si vous n’utilisez pas actuellement la version 2.2.0 (ou supérieure) du connecteur, Snowflake recommande fortement la mise à niveau vers la dernière version.
La version du connecteur Snowflake pour Spark est 2.2.0 (ou supérieure), mais vos opérations dépassent généralement les 36 heures. C’est la durée maximale du jeton Azure utilisé par le connecteur pour accéder à la zone de préparation interne pour l’échange de données.
Vous devez préparer un conteneur de stockage Azure Blob que le connecteur peut utiliser pour échanger des données entre Snowflake et Spark.
Utilisation d’informations d’authentification Azure¶
Il s’agit de la méthode d’authentification standard du stockage Azure Blob. Elle nécessite une paire de valeurs : tempDir
(une URL) and des valeurs temporary_azure_sas_token
.
Note
Ces valeurs doivent également être utilisées pour configurer Hadoop/Spark pour accéder au stockage Azure Blob. Pour plus d’informations, y compris des exemples, voir Authentification Hadoop/Spark avec Azure (dans ce chapitre).
Par exemple :
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>) // Then, configure your Snowflake environment // val sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<user_name>", "sfPassword" -> "<password>", "sfDatabase" -> "<database_name>", "sfSchema" -> "<schema_name>", "sfWarehouse" -> "<warehouse_name>", "sfCompress" -> "on", "sfSSL" -> "on", "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/", "temporary_azure_sas_token" -> "<azure_sas>" )
Pour plus de détails sur les options prises en charge par sfOptions
, voir Options Azure pour le transfert de données externe (dans ce chapitre).
Authentification Hadoop/Spark avec Azure¶
Pour utiliser cette méthode, modifiez les exemples Scala de ce chapitre pour ajouter les options de configuration Hadoop suivantes :
val hadoopConf = sc.hadoopConfiguration sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Assurez-vous que l’option tempdir
utilise également wasb://
.
L’authentification via un navigateur n’est pas prise en charge¶
Lorsque vous utilisez Spark Connector, il est peu pratique d’utiliser une forme d’authentification permettant d’afficher une fenêtre de navigateur pour demander à l’utilisateur des informations d’identification. La fenêtre n’apparaîtrait pas nécessairement sur l’ordinateur client. Par conséquent, Spark Connector ne prend en charge aucun type d’authentification, y compris MFA (authentification multifactorielle) ou SSO (authentification unique), qui appellerait une fenêtre du navigateur.