Utilisation du connecteur Spark

Le connecteur adhère à l’API Spark standard, mais avec deux options supplémentaires spécifiques à Snowflake décrites dans ce chapitre.

Dans ce chapitre, le terme COPY se réfère à :

  • COPY INTO <table> (utilisé pour transférer des données d’une zone de préparation interne ou externe vers une table).

  • COPY INTO <emplacement> (utilisé pour transférer des données d’une table vers une zone de préparation interne ou externe).

Dans ce chapitre :

Vérification de la connexion réseau à Snowflake avec SnowCD

Après avoir configuré votre pilote, vous pouvez évaluer et dépanner votre connectivité réseau à Snowflake en utilisant SnowCD.

Vous pouvez utiliser SnowCD pendant le processus de configuration initiale et à la demande à tout moment pour évaluer et dépanner votre connexion réseau à Snowflake.

Pushdown

Le connecteur Spark applique un pushdown de prédicat et de requête en capturant et en analysant les plans logiques Spark pour les opérations SQL. Lorsque la source de données est Snowflake, les opérations sont traduites en une requête SQL puis exécutées dans Snowflake pour améliorer les performances.

Cependant, comme cette traduction nécessite presque de traduire un par un les opérateurs SQL Spark en expressions Snowflake, tous les opérateurs Spark SQL ne peuvent pas être push down. Lorsque le pushdown échoue, le connecteur revient à un plan d’exécution moins optimisé. Les opérations non prises en charge sont à la place exécutées dans Spark.

Note

Si vous avez besoin de la fonction pushdown pour toutes les opérations, pensez à écrire votre code pour utiliser API Snowpark à la place.

Vous trouverez ci-dessous une liste des opérations prises en charge pour le pushdown (toutes les fonctions ci-dessous utilisent leurs noms Spark). Si une fonction ne figure pas dans cette liste, un plan Spark qui l’utilise peut être exécuté sur Spark plutôt que push down à Snowflake.

  • Fonctions d’agrégation

    • Average

    • Corr

    • CovPopulation

    • CovSample

    • Count

    • Max

    • Min

    • StddevPop

    • StddevSamp

    • Sum

    • VariancePop

    • VarianceSamp

  • Opérateurs booléens

    • And

    • Between

    • Contains

    • EndsWith

    • EqualTo

    • GreaterThan

    • GreaterThanOrEqual

    • In

    • IsNull

    • IsNotNull

    • LessThan

    • LessThanOrEqual

    • Not

    • Or

    • StartsWith

  • Fonctions de date, d’heure et d’horodatage

    • DateAdd

    • DateSub

    • Mois

    • Trimestre

    • TruncDate

    • TruncTimestamp

    • Année

  • Fonctions mathématiques

    • Opérateurs arithmétiques « + » (addition), « - » (soustraction), « * » (multiplication), « / » (division) et « - » (négation unaire).

    • Abs

    • Acos

    • Asin

    • Atan

    • Ceil

    • CheckOverflow

    • Cos

    • Cosh

    • Exp

    • Floor

    • Greatest

    • Least

    • Log

    • Pi

    • Pow

    • PromotePrecision

    • Rand

    • Round

    • Sin

    • Sinh

    • Sqrt

    • Tan

    • Tanh

  • Opérateurs divers

    • Alias (expressions AS)

    • BitwiseAnd

    • BitwiseNot

    • BitwiseOr

    • BitwiseXor

    • CaseWhen

    • Cast(enfant, t, _)

    • Coalesce

    • If

    • MakeDecimal

    • ScalarSubquery

    • ShiftLeft

    • ShiftRight

    • SortOrder

    • UnscaledValue

  • Opérateurs relationnels

    • Fonctions d’agrégation et clauses group-by

    • Distinct

    • Filters

    • In

    • InSet

    • Joins

    • Limits

    • Projections

    • Sorts (ORDER BY)

    • Union and Union All

    • Fonctions de fenêtre et clauses de fenêtrage

  • Fonctions de chaîne de caractères

    • Ascii

    • Concat(enfants)

    • Longueur

    • Like

    • Lower

    • StringLPad

    • StringRPad

    • StringTranslate

    • StringTrim

    • StringTrimLeft

    • StringTrimRight

    • Substring

    • Upper

  • Fonctions de fenêtre (remarque : elles ne fonctionnent pas avec Spark 2.2)

    • DenseRank

    • Rang

    • RowNumber

Utilisation du connecteur dans Scala

Spécification du nom de classe de source de données

Pour utiliser Snowflake comme source de données dans Spark, utilisez l’option .format pour fournir le nom de classe du connecteur Snowflake qui définit la source de données.

net.snowflake.spark.snowflake

Pour assurer une vérification du nom de la classe lors de la compilation, Snowflake recommande fortement de définir une variable pour le nom de classe. Par exemple :

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
Copy

De plus, par souci de facilité, la classe Utils fournit la variable qui peut être importée comme suit :

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
Copy

Note

Tous les exemples de ce chapitre utilisent SNOWFLAKE_SOURCE_NAME comme définition de classe.

Activation/désactivation du pushdown dans une session

La version 2.1.0 (et supérieure) du connecteur prend en charge le pushdown de requêtes, ce qui peut améliorer considérablement les performances en poussant le traitement des requêtes vers Snowflake lorsque Snowflake est la source de données Spark.

Par défaut, le pushdown est activé.

Pour désactiver le pushdown dans une session Spark pour un DataFrame donné :

  1. Après avoir instancié un objet SparkSession, appelez la méthode statique SnowflakeConnectorUtils.disablePushdownSession en transmettant l’objet SparkSession. Par exemple :

    SnowflakeConnectorUtils.disablePushdownSession(spark)
    
    Copy

    spark est votre objet SparkSession.

  2. Créez un DataFrame avec l’option autopushdown définie sur off. Par exemple :

    val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME)
      .options(sfOptions)
      .option("query", query)
      .option("autopushdown", "off")
      .load()
    
    Copy

    Notez que vous pouvez également définir l’option autopushdown dans un Map que vous transmettez à la méthode options (par exemple, dans sfOptions dans l’exemple ci-dessus).

Pour réactiver le pushdown après l’avoir désactivé, appelez la méthode statique SnowflakeConnectorUtils.enablePushdownSession (en transmettant l’objet SparkSession), et créez un DataFrame avec autopushdown activé.

Transfert de données depuis Snowflake vers Spark

Note

Lorsque vous utilisez des DataFrames, le connecteur Snowflake ne prend en charge que les requêtes SELECT.

Pour lire des données de Snowflake vers un DataFrame Spark :

  1. Utilisez la méthode read() de l’objet SqlContext pour construire un DataFrameReader.

  2. Spécifiez SNOWFLAKE_SOURCE_NAME en utilisant la méthode format(). Pour la définition, voir Spécification du nom de classe de source de données (dans ce chapitre).

  3. Spécifiez les options du connecteur en utilisant la méthode option() ou options(). Pour plus d’informations, voir Réglage des options de configuration du connecteur (dans ce chapitre).

  4. Spécifiez l’une des options suivantes pour les données de table à lire :

    • dbtable: le nom de la table à lire. Toutes les colonnes et tous les enregistrements sont récupérés (c’est-à-dire que cela est équivalent à SELECT * FROM db_table).

    • query: la requête exacte (instruction SELECT) à exécuter.

Notes sur l’utilisation

  • Actuellement, le connecteur ne prend pas en charge d’autres types de requête (par ex. les instructions SHOW, DESC ou DML) en cas d’utilisation de DataFrames.

  • Il y a une limite supérieure à la taille d’une ligne individuelle. Pour plus de détails, voir Limites de la taille du texte de requête.

Remarques sur les performances

Lorsque vous transférez des données entre Snowflake et Spark, utilisez les méthodes suivantes pour analyser/améliorer les performances :

  • Utilisez la méthode net.snowflake.spark.snowflake.Utils.getLastSelect() pour voir la requête actuelle émise lors du transfert de données depuis Snowflake vers Spark.

  • Si vous utilisez la fonctionnalité filter ou where du DataFrame Spark, vérifiez que les filtres respectifs sont présents dans la requête SQL émise. Le connecteur Snowflake essaie de traduire tous les filtres demandés par Spark en SQL.

    Cependant, il existe des formes de filtre que l’infrastructure Spark ne transmet pas aujourd’hui au connecteur Snowflake. Par conséquent, dans certaines situations, un grand nombre d’enregistrements inutiles sont demandés à Snowflake.

  • Si vous n’avez besoin que d’un sous-ensemble de colonnes, assurez-vous qu’il reflète le sous-ensemble dans la requête SQL.

  • En général, si la requête SQL émise ne correspond pas à ce que vous attendez sur la base des opérations de DataFrame, utilisez l’option query pour fournir la syntaxe SQL exacte que vous souhaitez.

Exemples

Lire une table entière :

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()
Copy

Lire les résultats d’une requête :

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()
Copy

Transfert de données de Spark à Snowflake

Les étapes pour enregistrer le contenu d’un DataFrame dans une table Snowflake sont similaires à celles servant à effectuer une écriture depuis Snowflake vers Spark :

  1. Utilisez la méthode write() du DataFrame pour construire un DataFrameWriter.

  2. Spécifiez SNOWFLAKE_SOURCE_NAME en utilisant la méthode format(). Pour la définition, voir Spécification du nom de classe de source de données (dans ce chapitre).

  3. Spécifiez les options du connecteur en utilisant la méthode option() ou options(). Pour plus d’informations, voir Réglage des options de configuration du connecteur (dans ce chapitre).

  4. Utilisez l’option dbtable pour spécifier la table dans laquelle les données sont écrites.

  5. Utilisez la méthode mode() pour spécifier le mode de sauvegarde du contenu.

    Pour plus d’informations, voir SaveMode (documentation Spark).

Exemples

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Exportation de JSON de Spark vers Snowflake

Les DataFrames Spark peuvent contenir des objets JSON sérialisés en tant que chaînes. Le code suivant fournit un exemple de conversion d’un DataFrame classique en DataFrame contenant des données JSON :

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)
Copy

Notez que le résultat jsonDataFrame contient une seule colonne de type StringType. Par conséquent, lorsque ce DataFrame est exporté vers Snowflake avec le mode commun SaveMode.Overwrite, une nouvelle table est créée dans Snowflake avec une seule colonne de type VARCHAR.

Pour charger jsonDataFrame dans une colonne VARIANT :

  1. Créez une table Snowflake (connectée à Snowflake dans Java à l’aide du pilote JDBC Snowflake). Pour des explications sur les paramètres de connexion utilisés dans l’exemple, voir Référence Paramètre de connexion pilote JDBC.

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
        String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.com/";
    
        Properties properties = new Properties();
        properties.put("user", "peter");
        properties.put("password", "test");
        properties.put("account", "myorganization-myaccount");
        properties.put("warehouse", "mywh");
        properties.put("db", "mydb");
        properties.put("schema", "public");
    
        // get connection
        System.out.println("Create JDBC connection");
        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
        System.out.println("Done creating JDBC connection\n");
        // create statement
        System.out.println("Create JDBC statement");
        Statement statement = connection.createStatement();
        System.out.println("Done creating JDBC statement\n");
        // create a table
        System.out.println("Create my_variant_table table");
        statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
        statement.close();
        System.out.println("Done creating demo table\n");
    
        connection.close();
        System.out.println("Close connection\n");
      }
    }
    
    Copy
  2. Au lieu d’utiliser SaveMode.Overwrite, utilisez SaveMode.Append pour réutiliser la table existante. Lorsque la valeur de chaîne représentant JSON est chargée dans Snowflake car la colonne cible est de type VARIANT, elle est analysée comme JSON. Par exemple :

    df.write
        .format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "my_variant_table")
        .mode(SaveMode.Append)
        .save()
    
    Copy

Exécution d’instructions DDL/DML SQL

Utilisez la méthode runQuery() de l’objet Utils pour exécuter les instructions DDL/DML SQL, en plus des requêtes, par exemple :

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
    )
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")
Copy

sfOptions est la carte des paramètres utilisée pour lire/écrire des DataFrames.

La méthode runQuery ne renvoie que TRUE ou FALSE. Elle est destinée aux instructions qui ne renvoient pas de jeu de résultats, par exemple les instructions DDL telles que CREATE TABLE et DML telles que INSERT, UPDATE et DELETE. Cela n’est pas utile pour les instructions qui renvoient un jeu de résultats, tel que SELECT ou SHOW.

Utilisation d’horodatages et de fuseaux horaires

Spark ne fournit qu’un seul type d’horodatage équivalent au type d’horodatage Scala/Java. Son comportement est presque identique à celui du type de données TIMESTAMP_LTZ (fuseau horaire local) dans Snowflake. Ainsi, lorsque vous transférez des données entre Spark et Snowflake, nous vous recommandons d’utiliser les approches suivantes pour conserver l’heure correctement, par rapport aux fuseaux horaires :

  • Utilisez uniquement le type de données TIMESTAMP_LTZ dans Snowflake.

    Note

    Le mapping par défaut de type de données d’horodatage est TIMESTAMP_NTZ (aucun fuseau horaire). Vous devez donc définir explicitement le paramètre TIMESTAMP_TYPE_MAPPING pour utiliser TIMESTAMP_LTZ.

  • Réglez le fuseau horaire Spark sur UTC et utilisez ce fuseau horaire dans Snowflake (c.-à-d. ne réglez pas l’option sfTimezone pour le connecteur, et ne définissez pas explicitement un fuseau horaire dans Snowflake). Dans ce scénario, TIMESTAMP_LTZ et TIMESTAMP_NTZ sont effectivement équivalents.

    Pour régler le fuseau horaire, ajoutez la ligne suivante à votre code Spark :

    java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
    
    Copy

Si vous ne mettez pas en œuvre l’une de ces méthodes, des modifications non souhaitées peuvent survenir. Par exemple, considérons le scénario suivant :

  • Le fuseau horaire dans Spark est réglé sur America/New_York.

  • Le fuseau horaire dans Snowflake est réglé sur Europe/Warsaw, ce qui peut arriver :

    • En réglant sfTimezone sur Europe/Warsaw pour le connecteur.

    • En réglant sfTimezone sur snowflake pour le connecteur et en définissant le paramètre de session TIMEZONE dans Snowflake sur Europe/Warsaw.

  • TIMESTAMP_NTZ et TIMESTAMP_LTZ sont tous deux utilisés dans Snowflake.

Dans ce scénario :

  1. Si une valeur représentant 12:00:00 dans une colonne TIMESTAMP_NTZ dans Snowflake est envoyée à Spark, cette valeur ne contient aucune information de fuseau horaire. Spark traite la valeur comme 12:00:00 à New York.

  2. Si Spark renvoie cette valeur 12:00:00 (à New York) à Snowflake pour être chargée dans une colonne TIMESTAMP_LTZ , elle est automatiquement convertie et chargée comme 18:00:00 (pour le fuseau horaire de Varsovie).

  3. Si cette valeur est ensuite convertie en TIMESTAMP_NTZ dans Snowflake, l’utilisateur voit 18:00:00, qui est différent de la valeur originale, 12:00:00.

Pour résumer, Snowflake recommande de suivre strictement au moins une de ces règles :

  • Utilisez le même fuseau horaire, de préférence UTC pour Snowflake et Spark.

  • Utilisez seulement le type de données TIMESTAMP_LTZ pour transférer des données entre Spark et Snowflake.

Exemple de programme Scala

Important

Cet exemple de programme suppose que vous utilisez la version 2.2.0 (ou supérieure) du connecteur qui utilise une zone de préparation interne Snowflake pour stocker des données temporaires, et qui n’a donc pas besoin d’un emplacement S3 pour stocker des données temporaires. Si vous utilisez une version antérieure, vous devez avoir un emplacement S3 existant et inclure des valeurs pour tempdir, awsAccessKey, awsSecretKey pour sfOptions. Pour plus de détails, voir Options AWS pour le transfert de données externe (dans ce chapitre).

Le programme Scala suivant fournit un cas d’utilisation complet pour le connecteur Snowflake pour Spark. Avant d’utiliser le code, remplacez les chaînes suivantes par les valeurs correspondantes, comme décrit dans Réglage des options de configuration du connecteur (dans ce chapitre) :

  • <identificateur_de_compte> : votre identificateur de compte.

  • <nom_utilisateur>, <mot de passe> : identifiants de connexion pour l’utilisateur Snowflake.

  • <base_de_données>, <schéma>, <entrepôt>: valeurs par défaut de la session Snowflake.

L’exemple de programme Scala utilise l’authentification de base (c’est-à-dire le nom d’utilisateur et le mot de passe). Si vous souhaitez vous authentifier avec OAuth, consultez Utilisation de l’OAuth externe (dans ce chapitre).

import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) from t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Utilisation du connecteur avec Python

L’utilisation du connecteur avec Python est très similaire à l’utilisation de Scala.

Nous recommandons d’utiliser le script bin/pyspark inclus dans la distribution Spark.

Configuration du script pyspark

Le script pyspark doit être configuré de la même manière que le script spark-shell , en utilisant les options --packages ou --jars. Par exemple :

bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3
Copy

N’oubliez pas d’inclure les fichiers .jar du connecteur Snowflake Spark et du connecteur JDBC dans votre variable d’environnement CLASSPATH.

Pour plus d’informations sur la configuration du script spark-shell , voir Étape 4 : Configuration du cluster local Spark ou de l’environnement Spark hébergé sur Amazon EMR.

Activation/désactivation du pushdown dans une session

La version 2.1.0 (et supérieure) du connecteur prend en charge le pushdown de requêtes, ce qui peut améliorer considérablement les performances en poussant le traitement des requêtes vers Snowflake lorsque Snowflake est la source de données Spark.

Par défaut, le pushdown est activé.

Pour désactiver le pushdown dans une session Spark pour un DataFrame donné :

  1. Après avoir instancié un objet SparkSession, appelez la méthode statique SnowflakeConnectorUtils.disablePushdownSession en transmettant l’objet SparkSession. Par exemple :

    sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
    
    Copy
  2. Créez un DataFrame avec l’option autopushdown définie sur off. Par exemple :

    df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sfOptions) \
      .option("query",  query) \
      .option("autopushdown", "off") \
      .load()
    
    Copy

    Notez que vous pouvez également définir l’option autopushdown dans un Dictionary que vous transmettez à la méthode options (par exemple, dans sfOptions dans l’exemple ci-dessus).

Pour réactiver le pushdown après l’avoir désactivé, appelez la méthode statique SnowflakeConnectorUtils.enablePushdownSession (en transmettant l’objet SparkSession), et créez un DataFrame avec autopushdown activé.

Exemple de script Python

Important

Cet exemple de script suppose que vous utilisez la version 2.2.0 (ou supérieure) du connecteur qui utilise une zone de préparation interne Snowflake pour stocker des données temporaires, et qui n’a donc pas besoin d’un emplacement S3 pour stocker ces données. Si vous utilisez une version antérieure, vous devez avoir un emplacement S3 existant et inclure des valeurs pour tempdir, awsAccessKey, awsSecretKey pour sfOptions. Pour plus de détails, voir Options AWS pour le transfert de données externe (dans ce chapitre).

Une fois le script pyspark configuré, vous pouvez effectuer des requêtes SQL et d’autres opérations. Voici un exemple de script Python qui effectue une simple requête SQL. Ce script illustre l’utilisation de base des connecteurs. La plupart des exemples Scala de ce document peuvent être adaptés avec un minimum d’efforts/de modifications pour une utilisation avec Python.

L’exemple de script Python utilise l’authentification de base (c’est-à-dire le nom d’utilisateur et le mot de passe). Si vous souhaitez vous authentifier avec OAuth, consultez Utilisation de l’OAuth externe (dans ce chapitre).

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()
Copy

Astuce

Notez l’utilisation de sfOptions et SNOWFLAKE_SOURCE_NAME. Cela simplifie le code et réduit les risques d’erreur.

Pour plus de détails sur les options prises en charge pour sfOptions, voir Réglage des options de configuration du connecteur (dans ce chapitre).

Mappages de type de données

Le connecteur Spark prend en charge la conversion entre de nombreux types de données classiques.

De SQL Spark vers Snowflake

Type de données Spark

Type de données Snowflake

ArrayType

VARIANT

BinaryType

Non pris en charge

BooleanType

BOOLEAN

ByteType

INTEGER. Snowflake ne prend pas en charge le type BYTE.

DateType

DATE

DecimalType

DECIMAL

DoubleType

DOUBLE

FloatType

FLOAT

IntegerType

INTEGER

LongType

INTEGER

MapType

VARIANT

ShortType

INTEGER

StringType

Si une longueur est spécifiée, VARCHAR(N) ; autrement, VARCHAR

StructType

VARIANT

TimestampType

TIMESTAMP

De Snowflake vers SQL Spark.

Type de données Snowflake

Type de données Spark

ARRAY

StringType

BIGINT

DecimalType(38, 0)

BINARY

Non pris en charge

BLOB

Non pris en charge

BOOLEAN

BooleanType

CHAR

StringType

CLOB

StringType

DATE

DateType

DECIMAL

DecimalType

DOUBLE

DoubleType

FLOAT

DoubleType

INTEGER

DecimalType(38, 0)

OBJECT

StringType

TIMESTAMP

TimestampType

TIME

StringType (Version du connecteur Spark 2.4.14 ou ultérieure)

VARIANT

StringType

Appel de la méthode DataFrame.show

Si vous appelez la méthode DataFrame.show et transmettez un nombre inférieur au nombre de lignes dans le DataFrame, construisez un DataFrame qui contient uniquement les lignes à afficher dans un ordre trié.

Pour ce faire :

  1. Appelez d’abord la méthode sort pour renvoyer un DataFrame qui contient des lignes triées.

  2. Appelez la méthode limit sur ce DataFrame pour renvoyer un DataFrame qui ne contient que les lignes que vous voulez afficher.

  3. Appelez la méthode show sur le DataFrame retourné.

Par exemple, si vous voulez afficher 5 lignes et que vous voulez que les résultats soient triés par la colonne my_col :

val dfWithRowsToShow = originalDf.sort("my_col").limit(5)
dfWithRowsToShow.show(5)
Copy

Sinon, si vous appelez show pour afficher un sous-ensemble de lignes dans DataFrame, différentes exécutions du code pourraient entraîner l’affichage de différentes lignes.

Réglage des options de configuration du connecteur

Les sections suivantes énumèrent les options que vous définissez pour configurer le comportement du connecteur :

Pour définir ces options, appelez la méthode .option(<key>, <valeur>) ou .options(<map>) de la classe Spark DataframeReader.

Astuce

Pour faciliter l’utilisation des options, Snowflake recommande de spécifier les options dans un seul objet Map et d’appeler .options(<map>) pour définir les options.

Options de connexion requises

Les options suivantes sont requises pour la connexion à Snowflake :

sfUrl

Spécifie le nom d’hôte de votre compte au format suivant :

account_identifier.snowflakecomputing.com

account_identifier est votre identificateur de compte.

sfUser

Nom de connexion de l’utilisateur Snowflake.

Vous devez utiliser l’une des options suivantes pour vous authentifier :

  • sfPassword

    Mot de passe de l’utilisateur Snowflake.

  • pem_private_key

    Clé privée (au format PEM) pour l’authentification par paire de clés. Pour obtenir des instructions, voir Authentification par paire de clés et rotation de paires de clés.

  • sfAuthenticator

    Spécifie l’utilisation de External OAuth pour s’authentifier auprès de Snowflake. Définissez la valeur sur oauth.

    L’utilisation de l’OAuth externe nécessite de définir le paramètre sfToken.

sfToken

(Nécessaire en cas d’utilisation de External OAuth) Définissez la valeur sur votre jeton d’accès External OAuth.

Le paramètre de connexion nécessite de définir la valeur du paramètre sfAuthenticator sur oauth.

La valeur par défaut est aucun.

Options de contexte requises

Les options suivantes sont nécessaires pour définir le contexte de base de données et de schéma de la session :

sfDatabase

La base de données à utiliser pour la session après la connexion.

sfSchema

Le schéma à utiliser pour la session après la connexion.

Options de contexte supplémentaires

Les options énumérées dans cette section ne sont pas obligatoires.

sfAccount

Identificateur du compte (par exemple myorganization-myaccount). Cette option n’est plus nécessaire car l’identificateur du compte est spécifié dans sfUrl. Elle n’est documentée ici que pour des raisons de rétrocompatibilité.

sfWarehouse

L’entrepôt virtuel par défaut à utiliser pour la session après la connexion.

sfRole

Le rôle de sécurité par défaut à utiliser pour la session après la connexion.

Options de proxy

Les options énumérées dans cette section ne sont pas obligatoires.

use_proxy

Indique si le connecteur doit utiliser un proxy :

  • true spécifie que le connecteur doit utiliser un proxy.

  • false spécifie que le connecteur ne doit pas utiliser de proxy.

La valeur par défaut est false.

proxy_host

(Nécessaire si use_proxy est true) Spécifie le nom d’hôte du serveur proxy à utiliser.

proxy_port

(Nécessaire si use_proxy est true) Spécifie le numéro de port du serveur proxy à utiliser.

proxy_protocol

Spécifie le protocole utilisé pour se connecter au serveur proxy. Vous pouvez spécifier l’une des valeurs suivantes :

  • http

  • https

La valeur par défaut est http.

Ceci n’est pris en charge que pour Snowflake sur AWS.

Cette option a été ajoutée dans la version 2.11.1 du Connecteur Spark.

proxy_user

Spécifie le nom d’utilisateur pour l’authentification au serveur proxy. Définissez cette option si le serveur proxy nécessite une authentification.

Ceci n’est pris en charge que pour Snowflake sur AWS.

proxy_password

Spécifie le mot de passe de proxy_user pour l’authentification au serveur proxy. Définissez cette option si le serveur proxy nécessite une authentification.

Ceci n’est pris en charge que pour Snowflake sur AWS.

non_proxy_hosts

Spécifie la liste des hôtes auxquels le connecteur doit se connecter directement, sans passer par le serveur proxy.

Séparez les noms d’hôtes par un symbole barre verticale avec échappement d’URL(%7C). Vous pouvez également utiliser un astérisque (*) comme caractère générique.

Ceci n’est pris en charge que pour Snowflake sur AWS.

Options supplémentaires

Les options énumérées dans cette section ne sont pas obligatoires.

sfTimezone

Le fuseau horaire à utiliser par Snowflake en cas d’utilisation de Spark. Notez que le paramètre définit uniquement le fuseau horaire dans Snowflake. L’environnement Spark reste inchangé. Les valeurs prises en charge sont les suivantes :

  • spark: utilisez le fuseau horaire de Spark (par défaut).

  • snowflake: utilisez le fuseau horaire actuel pour Snowflake.

  • sf_default: utilisez le fuseau horaire par défaut pour l’utilisateur Snowflake qui se connecte.

  • time_zone : Utilisez un fuseau horaire spécifique (par ex. America/New_York), si valide.

    Pour plus d’informations sur les conséquences de cette option, voir Utilisation d’horodatages et de fuseaux horaires (dans ce chapitre).

sfCompress

Si l’option est réglée sur on (par défaut), les données transmises entre Snowflake et Spark sont compressées.

s3MaxFileSize

La taille du fichier utilisé lors du transfert de données depuis Snowflake vers Spark. La valeur par défaut est 10MB.

preactions

Une liste de commandes SQL séparées par des points-virgules qui sont exécutées avant le transfert des données entre Spark et Snowflake.

Si une commande SQL contient %s, %s est remplacée par le nom de la table référencée pour l’opération.

postactions

Une liste de commandes SQL séparées par des points-virgules qui sont exécutées après le transfert des données entre Spark et Snowflake.

Si une commande SQL contient %s, elle est remplacée par le nom de la table référencée pour l’opération.

truncate_columns

Si l’option est définie sur on (par défaut), une commande COPY tronque automatiquement les chaînes de texte dépassant la longueur de la colonne cible. Si l’option est définie sur off, la commande produit une erreur si une chaîne chargée dépasse la longueur de la colonne cible.

truncate_table

Ce paramètre contrôle si le schéma d’une table cible Snowflake est conservé lorsque la table est écrasée.

Par défaut, lorsqu’une table cible dans Snowflake est écrasée, le schéma de cette table cible est également écrasé. Le nouveau schéma est basé sur le schéma de la table source (le dataframe Spark).

Cependant, le schéma de la source n’est parfois pas idéal. Par exemple, un utilisateur peut vouloir qu’une table cible Snowflake puisse stocker des valeurs FLOAT à l’avenir, même si le type de données de la colonne source initiale est INTEGER. Dans ce cas, le schéma de la table Snowflake ne doit pas être écrasé. La table Snowflake doit simplement être tronquée et réutilisée avec son schéma actuel.

Les valeurs possibles de ce paramètre sont :

  • on

  • off

Si ce paramètre est on, le schéma original de la table cible est conservé. Si ce paramètre est off, alors l’ancien schéma de la table est ignoré, et un nouveau schéma est généré sur la base du schéma de la source.

Ce paramètre est facultatif.

La valeur par défaut de ce paramètre est off (c’est-à-dire que, par défaut, le schéma de la table originale est écrasé).

Pour plus de détails sur le mappage de types de données Spark à des types de données Snowflake (et vice versa), voir Mappages de type de données (dans ce chapitre).

continue_on_error

Cette variable contrôle si la commande COPY abandonne si l’utilisateur saisit des données non valides (par exemple, format JSON invalide pour une colonne de type de données variable).

Les valeurs possibles sont :

  • on

  • off

La valeur on signifie continuer même si une erreur se produit. La valeur off signifie annuler si une erreur est détectée.

Ce paramètre est facultatif.

La valeur par défaut de ce paramètre est off.

Il n’est pas recommandé d’activer cette option. Si des erreurs sont détectées lors du processus de COPYing vers Snowflake à l’aide du connecteur Spark, alors certaines données peuvent être absentes.

Note

Si des lignes sont rejetées ou absents, et que ces lignes ne sont pas clairement défectueuses dans la source d’entrée, veuillez le signaler à Snowflake.

usestagingtable

Ce paramètre contrôle si le chargement de données utilise une table de mise en zone de préparation.

Une table d’échelonnement est une table normale (avec un nom temporaire) qui est créée par le connecteur. Si l’opération de chargement des données est réussie, la table cible originale est détruite et la table d’échelonnement est renommée au nom de la table cible d’origine. Si l’opération de chargement des données échoue, la table d’échelonnement est détruite et la table cible se retrouve avec les données qu’elle avait immédiatement avant l’opération. Ainsi, la table d’échelonnement permet de conserver les données de la table cible d’origine en cas d’échec de l’opération. Pour des raisons de sécurité, Snowflake recommande fortement l’utilisation d’une table d’échelonnement dans la plupart des cas.

Pour que le connecteur puisse créer une table d’échelonnement, l’utilisateur exécutant le COPY via le connecteur Spark doit avoir suffisamment de privilèges pour créer une table. Le chargement direct (c’est-à-dire le chargement sans utiliser de table d’échelonnement) est utile si l’utilisateur n’a pas la permission de créer une table.

Les valeurs possibles de ce paramètre sont :

  • on

  • off

Si le paramètre est on, une table d’échelonnement est utilisée. Si ce paramètre est off, les données sont chargées directement dans la table cible.

Ce paramètre est facultatif.

La valeur par défaut de ce paramètre est on (c.-à-d., utiliser une table de mise en zone de préparation).

autopushdown

Ce paramètre contrôle si le pushdown automatique de requêtes est activé.

Si le pushdown est activé, alors, lorsqu’une requête est exécutée sur Spark, si une partie de la requête peut être « poussée vers le bas » vers le serveur Snowflake, elle est alors poussée vers le bas. Ceci améliore les performances de certaines requêtes.

Ce paramètre est facultatif.

La valeur par défaut est on si le connecteur est connecté à une version compatible de Spark. Sinon, la valeur par défaut est off.

Si le connecteur est connecté à une version différente de Spark que celle à laquelle il est destiné (par exemple, si la version 3.2 du connecteur est connectée à la version 3.3 de Spark), l’auto-pushdown est désactivé même si ce paramètre est réglé sur on.

purge

Si cette option est définie sur on, le connecteur supprime les fichiers temporaires créés lors du transfert depuis Spark vers Snowflake via un transfert de données externe. Si ce paramètre est réglé sur off, ces fichiers ne sont pas automatiquement supprimés par le connecteur.

La purge ne fonctionne que pour les transferts de Spark vers Snowflake, et non pour les transferts de Snowflake vers Spark.

Les valeurs possibles sont :

  • on

  • off

La valeur par défaut est off.

columnmap

Ce paramètre est utile lorsque vous écrivez des données de Spark dans Snowflake et que les noms de colonne de la table Snowflake ne correspondent pas à ceux de la table Spark. Vous pouvez créer une carte qui indique quelle colonne source Spark correspond à quelle colonne cible Snowflake.

Le paramètre est un littéral de chaîne unique, sous la forme suivante :

"Map(col_2 -> col_b, col_3 -> col_a)"

Par exemple, considérons le scénario suivant :

  • Supposons qu’un Dataframe nommé df dans Spark ait trois colonnes :

    col_1 , col_2 , col_3

  • Une table nommée tb dans Snowflake a deux colonnes :

    col_a , col_b

  • Vous souhaitez copier les valeurs suivantes :

    • De df.col_2 à tb.col_b.

    • De df.col_3 à tb.col_a.

La valeur du paramètre columnmap serait :

Map(col_2 -> col_b, col_3 -> col_a)

Vous pouvez générer cette valeur en exécutant le code Scala suivant :

Map("col_2"->"col_b","col_3"->"col_a").toString()

La valeur par défaut de ce paramètre est nulle. En d’autres termes, par défaut, les noms de colonne des tables source et cible doivent correspondre.

Ce paramètre n’est utilisé que pour effectuer une écriture depuis Spark vers Snowflake. Il ne s’applique pas pour une écriture depuis Snowflake vers Spark.

keep_column_case

Lors de l’écriture d’une table de Spark dans Snowflake, le connecteur Spark décale par défaut les lettres des noms de colonnes en majuscules, à moins que les noms de colonnes ne soient entre guillemets.

Lors de l’écriture d’une table de Snowflake dans Spark, le connecteur Spark ajoute par défaut des guillemets autour de tout nom de colonne contenant des caractères, à l’exception des lettres majuscules, des caractères de soulignement et des chiffres.

Si vous définissez keep_column_case sur on, le connecteur Spark n’effectuera pas ces modifications.

Les valeurs possibles sont :

  • on

  • off

La valeur par défaut est off.

column_mapping

Le connecteur doit mapper les colonnes du cadre de données Spark à la table Snowflake. Cela peut être effectué en fonction des noms de colonne (quel que soit leur ordre) ou de leur ordre (c’est-à-dire que la première colonne du cadre de données est mappée avec la première colonne de la table, quel que soit le nom de la colonne).

Par défaut, le mappage est effectué en fonction de l’ordre. Vous pouvez remplacer ce paramètre en définissant ce paramètre sur name, qui indique au connecteur de mapper les colonnes en fonction de leurs noms. (La correspondance de noms est insensible à la casse.)

Les valeurs possibles de ce paramètre sont :

  • order

  • name

La valeur par défaut est order.

column_mismatch_behavior

Ce paramètre ne s’applique que lorsque le paramètre column_mapping est défini sur name.

Si les noms de colonne dans le cadre de données Spark et la table de Snowflake ne correspondent pas, alors :

  • Si column_mismatch_behavior est error, Spark Connector signale une erreur.

  • Si column_mismatch_behavior est ignore, Spark Connector ignore l’erreur.

    • Le pilote supprime toute colonne du cadre de données Spark qui n’a pas de colonne correspondante dans la table Snowflake.

    • Le pilote insère NULLs dans n’importe quelle colonne de la table Snowflake qui n’a pas de colonne correspondante dans le cadre de données Spark.

Les erreurs potentielles incluent :

  • Le cadre de données Spark peut contenir des colonnes identiques à l’exception de la casse (majuscules/minuscules). Étant donné que le mappage des noms de colonne ne respecte pas la casse, il n’est pas possible de déterminer le mappage correct du cadre de données à la table.

  • La table Snowflake peut contenir des colonnes identiques à l’exception de la casse (majuscules/minuscules). Étant donné que le mappage des noms de colonne ne respecte pas la casse, il n’est pas possible de déterminer le mappage correct du cadre de données à la table.

  • Le cadre de données Spark et la table Snowflake peuvent ne pas avoir de noms de colonnes en commun. En théorie, le connecteur Spark pourrait insérer NULLs dans chaque colonne de chaque ligne, mais cela n’a généralement pas de sens. Le connecteur renvoie donc une erreur même si column_mismatch_behavior est défini sur ignore.

Les valeurs possibles de ce paramètre sont :

  • error

  • ignore

La valeur par défaut est error.

time_output_format

Ce paramètre permet à l’utilisateur de spécifier le format des données TIME renvoyées.

Les valeurs possibles de ce paramètre sont les valeurs possibles pour les formats d’heure spécifiés à Formats d’heure.

Ce paramètre affecte uniquement la sortie, pas l’entrée.

timestamp_ntz_output_format, . timestamp_ltz_output_format, . timestamp_tz_output_format

Ces options spécifient le format de sortie des valeurs d’horodatage. Les valeurs par défaut de ces options sont les suivantes :

Option de configuration

Valeur par défaut

timestamp_ntz_output_format

"YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_ltz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_tz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

Si ces options ont pour valeur "sf_current", le connecteur utilise les formats spécifiés pour la session.

partition_size_in_mb

Ce paramètre est utilisé lorsque l’ensemble de résultats de la requête est très volumineux et doit être divisé en plusieurs partitions DataFrame. Ce paramètre spécifie la taille non compressée recommandée pour chaque partition DataFrame. Pour réduire le nombre de partitions, augmentez cette taille.

Cette taille est utilisée comme taille recommandée, la taille réelle des partitions peut être plus petite ou plus grande.

Cette option s’applique uniquement lorsque le paramètre use_copy_unload est FALSE.

Ce paramètre est facultatif.

La valeur par défaut est 100 (MB).

use_copy_unload

Si la valeur est FALSE, Snowflake utilise le format de données Arrow lors de la [sélection] SELECTing de données. Si la valeur est TRUE, Snowflake revient à l’ancien comportement consistant à utiliser la commande COPY UNLOAD pour transmettre les données sélectionnées.

Ce paramètre est facultatif.

La valeur par défaut est FALSE.

treat_decimal_as_long

Si TRUE, configure le connecteur Spark pour retourner des valeurs Long (plutôt que des valeurs BigDecimal) pour les requêtes qui retournent le type Decimal(precision, 0).

La valeur par défaut est FALSE.

Cette option a été ajoutée dans la version 2.11.1 du Connecteur Spark.

s3_stage_vpce_dns_name

Spécifie le nom DNS du point de terminaison de votre VPCpour l’accès aux zones de préparation internes.

Cette option a été ajoutée dans la version 2.11.1 du Connecteur Spark.

support_share_connection

Si FALSE, configure le connecteur Spark pour qu’il crée une nouvelle connexion JDBC pour chaque tâche ou action qui utilise les mêmes options du connecteur Spark pour accéder à Snowflake.

La valeur par défaut est TRUE, ce qui signifie que les différentes tâches et actions partagent la même connexion JDBC s’ils utilisent les mêmes options du connecteur Spark pour accéder à Snowflake.

Si vous devez activer ou désactiver ce paramètre par programmation, utilisez les fonctions statiques globales suivantes :

  • SparkConnectorContext.disableSharedConnection()

  • SparkConnectorContext.enableSharingJDBCConnection()

Note

Dans les cas particuliers suivants, le connecteur Spark n’utilise pas de connexion JDBC partagée :

  • Si des préactions ou postactions sont définies et que ces préactions ou postactions ne sont pas CREATE TABLE, DROP TABLE ou MERGE INTO, le connecteur Spark n’utilise pas la connexion partagée.

  • Les fonctions utilitaires telles que Utils.runQuery() et Utils.getJDBCConnection() n’utilisent pas la connexion partagée.

Cette option a été ajoutée dans la version 2.11.2 du connecteur Spark.

force_skip_pre_post_action_check_for_shared_session

Si TRUE, configure le connecteur Spark pour qu’il désactive la validation des préactions et des postactions pour le partage de session.

La valeur par défaut est FALSE.

Important

Avant de définir cette option, assurez-vous que les requêtes dans préactions et postactions n’affectent pas les paramètres de la session. Dans le cas contraire, vous risquez de ne pas obtenir les résultats escomptés.

Cette option a été ajoutée dans la version 2.11.3 du connecteur Spark.

Utilisation de l’authentification par paires de clés et rotation des paires de clés

Le connecteur Spark prend en charge l’authentification par paire de clés et la rotation des clés.

  1. Pour commencer, complétez la configuration initiale pour l’authentification de la paire de clés comme indiqué dans Authentification par paire de clés et rotation de paires de clés.

  2. Envoyez une copie non chiffrée de la clé privée à l’aide de l’option de connexion pem_private_key .

Attention

Pour des raisons de sécurité, plutôt que de coder en dur la pem_private_key dans votre application, vous devez définir le paramètre de manière dynamique après avoir lu la clé à partir d’une source sécurisée. Si la clé est chiffrée, déchiffrez-la et envoyez la version déchiffrée.

Dans l’exemple Python, notez que le fichier pem_private_key, rsa_key.p8, est :

  • Est directement lu à partir d’un fichier protégé par mot de passe, à l’aide de la variable d’environnement PRIVATE_KEY_PASSPHRASE.

  • Utilise l’expression pkb dans la chaîne sfOptions.

Pour vous connecter, vous pouvez enregistrer l’exemple Python dans un fichier (par ex. : <file.py>) puis exécuter la commande suivante :

spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>
Copy

Python

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os

with open("<path>/rsa_key.p8", "rb") as key_file:
  p_key = serialization.load_pem_private_key(
    key_file.read(),
    password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
    backend = default_backend()
    )

pkb = p_key.private_bytes(
  encoding = serialization.Encoding.PEM,
  format = serialization.PrivateFormat.PKCS8,
  encryption_algorithm = serialization.NoEncryption()
  )

pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')

sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "pem_private_key" : pkb,
  "sfDatabase" : "<database>",
  "sfSchema" : "schema",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "COLORS") \
    .load()

df.show()
Copy

Utilisation de l’OAuth externe

À partir de la version du connecteur Spark 2.7.0, vous pouvez utiliser External OAuth pour vous authentifier auprès de Snowflake à l’aide de l’exemple de programme Scala ou de l’exemple de script Python.

Avant d’utiliser OAuth externe et le connecteur Spark pour vous authentifier auprès de Snowflake, configurez une intégration de sécurité OAuth externe pour l’un des serveurs d’autorisation OAuth externe pris en charge ou un client personnalisé External OAuth externe.

Dans les exemples Scala et Python, notez le remplacement du paramètre sfPassword par les paramètres sfAuthenticator et sfToken.

Scala :

// spark connector version

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.DataFrame

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<username>",
    "sfAuthenticator" -> "oauth",
    "sfToken" -> "<external_oauth_access_token>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "region")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Python :

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfAuthenticator" : "oauth",
  "sfToken" : "<external_oauth_access_token>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()
Copy

Options AWS pour le transfert de données externe

Ces options sont utilisées pour spécifier l’emplacement Amazon S3 où les données temporaires sont stockées, et fournissent des détails d’authentification pour accéder à cet emplacement. Ils sont uniquement requis si vous effectuez un transfert de données externe. Les transferts de données externes sont nécessaires si l’une des affirmations suivantes est vraie :

  • Vous utilisez la version 2.1.x ou une version inférieure du connecteur Spark (qui ne prend pas en charge les transferts internes).

  • Votre transfert prendra probablement 36 heures ou plus (les transferts internes utilisent des informations d’authentification temporaires qui expirent après 36 heures).

tempDir

L’emplacement S3 où sont stockées les données préparées (par exemple s3n://xy12345-bucket/spark-snowflake-tmp/).

Si tempDir est spécifié, vous devez également spécifier :

  • awsAccessKey , awsSecretKey . ou

  • temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

awsAccessKey , awsSecretKey

Il s’agit d’informations d’authentification AWS standard qui permettent d’accéder à l’emplacement spécifié dans tempDir. Notez que ces deux options doivent être définies ensemble.

Si elles sont définies, elles peuvent être récupérées depuis un objet SparkContext existant.

Si vous spécifiez ces variables, vous devez également spécifier tempDir.

Ces informations d’authentification doivent également être définies pour le cluster Hadoop.

temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

Il s’agit d’informations d’authentification AWS temporaires qui permettent d’accéder à l’emplacement spécifié dans tempDir. Notez que ces trois options doivent être définies ensemble.

De plus, si ces options sont définies, elles ont priorité sur les options awsAccessKey et awsSecretKey.

Si vous spécifiez temporary_aws_access_key_id, temporary_aws_secret_access_key et temporary_aws_session_token, vous devez également spécifier tempDir. Sinon, ces paramètres sont ignorés.

check_bucket_configuration

Si le paramètre est réglé sur on (par défaut), le connecteur vérifie si le compartiment utilisé pour le transfert de données possède une politique de cycle de vie configurée (voir Préparation d’un compartiment S3 AWS externe pour plus d’informations). S’il n’y a aucune politique de cycle de vie, un avertissement est enregistré.

La désactivation de cette option (en la réglant sur off) ignore cette vérification. Ceci peut être utile si un utilisateur peut accéder aux opérations de données de compartiment, mais pas aux politiques de cycle de vie du compartiment. La désactivation de cette option peut également accélérer légèrement les temps d’exécution des requêtes.

Pour plus de détails, voir Authentification S3 pour l’échange de données (dans ce chapitre).

Options Azure pour le transfert de données externe

Cette section décrit les paramètres qui s’appliquent au stockage Azure Blob lors de transferts de données externes. Les transferts de données externes sont nécessaires si l’une des affirmations suivantes est vraie :

  • Vous utilisez la version 2.1.x ou une version inférieure du connecteur Spark (qui ne prend pas en charge les transferts internes).

  • Votre transfert prendra probablement 36 heures ou plus (les transferts internes utilisent des informations d’authentification temporaires qui expirent après 36 heures).

Lorsque vous utilisez un transfert externe avec le stockage Azure Blob, vous spécifiez l’emplacement du conteneur Azure et le SAS (signature d’accès partagé) pour ce conteneur en utilisant les paramètres décrits ci-dessous.

tempDir

Le conteneur de stockage Azure Blob où sont stockées les données intermédiaires. Cela se présente sous la forme d’une URL, par exemple :

wasb://<conteneur_azure>@<compte_azure>.<point_de_terminaison_azure>/

temporary_azure_sas_token

Spécifiez le jeton SAS pour le stockage Azure Blob.

Pour plus de détails, voir Authentification Azure pour l’échange de données (dans ce chapitre).

Spécification des informations Azure pour le stockage temporaire dans Spark

Lorsque vous utilisez Azure Blob Storage pour fournir un stockage temporaire afin de transférer des données entre Spark et Snowflake, vous devez fournir à Spark, ainsi qu’au connecteur Snowflake Spark, l’emplacement et les informations d’identification du stockage temporaire.

Pour fournir l’emplacement de stockage temporaire à Spark, exécutez les commandes similaires à celles décrites ci-dessous sur votre cluster Spark :

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Copy

Notez que la dernière commande contient les variables suivantes :

  • <conteneur> et <compte> : il s’agit du conteneur et du nom du compte pour votre déploiement Azure.

  • <point_de_terminaison_azure> : il s’agit du point de terminaison de votre emplacement de déploiement Azure. Par exemple, si vous utilisez un déploiement US Azure, le point de terminaison est probablement blob.core.windows.net.

  • <azure_sas> : il s’agit du jeton de sécurité Shared Access Signature.

Remplacez chacune de ces variables par les informations appropriées pour votre compte de stockage Azure Blob.

Transmission de paramètres de session Snowflake en tant qu’options pour le connecteur

Le connecteur Snowflake pour Spark prend en charge l’envoi de paramètres de session arbitraires à Snowflake (voir Paramètres de session pour plus d’informations). Ceci peut être réalisé en ajoutant une paire ("<clé>" -> "<valeur>") à l’objet options, où <clé> est le nom de paramètre de session et <valeur> est la valeur.

Note

La <valeur> devrait être une chaîne entourée de guillemets doubles, même pour les paramètres qui acceptent les nombres ou les valeurs booléennes (par exemple "1" ou "true").

Par exemple, l’exemple de code suivant transmet le paramètre de session USE_CACHED_RESULT avec une valeur de "false" qui désactive l’utilisation des résultats des requêtes exécutées précédemment :

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method
Copy

Remarques relatives à la sécurité

Les clients doivent s’assurer que dans un système Spark à plusieurs nœuds, les communications entre les nœuds sont sécurisées. Le maître Spark envoie les informations d’identification Snowflake aux employés Spark afin que ces derniers puissent accéder aux zones de préparation Snowflake. Si les communications entre le maître Spark et les employés Spark ne sont pas sécurisées, les informations d’identification peuvent être lues par un tiers non autorisé.

Authentification S3 pour l’échange de données

Cette section décrit comment s’authentifier en cas d’utilisation de S3 pour échanger des données.

Cette tâche est seulement nécessaire dans l’une des circonstances suivantes :

  • La version du connecteur Snowflake pour Spark est 2.1.x (ou inférieure). Depuis la version 2.2.0, le connecteur utilise une zone de préparation interne temporaire Snowflake pour les échanges de données. Si vous n’utilisez pas actuellement la version 2.2.0 (ou supérieure) du connecteur, Snowflake recommande fortement la mise à niveau vers la dernière version.

  • La version du connecteur Snowflake pour Spark est 2.2.0 (ou supérieure), mais vos opérations dépassent généralement les 36 heures. C’est la durée maximale du jeton AWS utilisé par le connecteur pour accéder à la zone de préparation interne pour l’échange de données.

Si vous utilisez une ancienne version du connecteur, vous devez préparer un emplacement S3 que le connecteur peut utiliser pour échanger des données entre Snowflake et Spark.

Pour permettre l’accès au compartiment/répertoire S3 utilisé pour échanger des données entre Spark et Snowflake (comme spécifié pour tempDir), deux méthodes d’authentification sont prises en charge :

  • Informations d’authentification AWS permanentes (également utilisées pour configurer l’authentification Hadoop/Spark pour accéder à S3).

  • Informations d’authentification AWS temporaires

Utilisation d’informations d’authentification AWS permanentes

Il s’agit de la méthode d’authentification AWS standard. Elle nécessite une paire de valeurs awsAccessKey et awsSecretKey.

Note

Ces valeurs doivent également être utilisées pour configurer Hadoop/Spark pour accéder à S3. Pour plus d’informations, y compris des exemples, voir Authentification Hadoop/Spark à l’aide de S3A ou S3N (dans ce chapitre).

Par exemple :

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)
Copy

Pour plus de détails sur les options prises en charge par sfOptions, voir Options AWS pour le transfert de données externe (dans ce chapitre).

Authentification Hadoop/Spark à l’aide de S3A ou S3N

Les écosystèmes Hadoop/Spark prennent en charge 2 schémas URI pour accéder à S3 :

s3a://

Nouvelle méthode recommandée (pour Hadoop 2.7 et supérieur)

Pour utiliser cette méthode, modifiez les exemples Scala de ce chapitre pour ajouter les options de configuration Hadoop suivantes :

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)
Copy

Assurez-vous que l’option tempdir utilise également s3a://.

s3n://

Ancienne méthode (pour Hadoop 2.6 et antérieur)

Dans certains systèmes, il est nécessaire de la spécifier explicitement comme le montre l’exemple Scala suivant :

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)
Copy

Utilisation d’informations d’authentification AWS temporaires

Cette méthode utilise les options de configuration temporary_aws_access_key_id, temporary_aws_secret_access_key et temporary_aws_session_token pour le connecteur.

Cette méthode permet une sécurité supplémentaire en fournissant à Snowflake un accès temporaire au compartiment/répertoire S3 utilisé pour l’échange de données.

Note

Les informations d’authentification temporaires ne peuvent être utilisées que pour configurer l’authentification S3 pour le connecteur. Elles ne peuvent être utilisées pour configurer l’authentification Hadoop/Spark.

De plus, si vous fournissez des informations d’authentification temporaires, elles ont la priorité sur toutes les informations d’authentification permanentes fournies.

L’exemple de code Scala suivant fournit un exemple d’authentification à l’aide d’identifiants temporaires :

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())
Copy

sfOptions2 peut maintenant être utilisé avec la méthode options() DataFrame.

Authentification Azure pour l’échange de données

Cette section décrit comment s’authentifier en cas d’utilisation du stockage Azure Blob pour échanger des données.

S’authentifier de cette façon est uniquement nécessaire dans l’une des circonstances suivantes :

  • La version du connecteur Snowflake pour Spark est 2.1.x (ou inférieure). Depuis la version 2.2.0, le connecteur utilise une zone de préparation interne temporaire Snowflake pour les échanges de données. Si vous n’utilisez pas actuellement la version 2.2.0 (ou supérieure) du connecteur, Snowflake recommande fortement la mise à niveau vers la dernière version.

  • La version du connecteur Snowflake pour Spark est 2.2.0 (ou supérieure), mais vos opérations dépassent généralement les 36 heures. C’est la durée maximale du jeton Azure utilisé par le connecteur pour accéder à la zone de préparation interne pour l’échange de données.

Vous devez préparer un conteneur de stockage Azure Blob que le connecteur peut utiliser pour échanger des données entre Snowflake et Spark.

Utilisation d’informations d’authentification Azure

Il s’agit de la méthode d’authentification standard du stockage Azure Blob. Elle nécessite une paire de valeurs : tempDir (une URL) and des valeurs temporary_azure_sas_token.

Note

Ces valeurs doivent également être utilisées pour configurer Hadoop/Spark pour accéder au stockage Azure Blob. Pour plus d’informations, y compris des exemples, voir Authentification Hadoop/Spark avec Azure (dans ce chapitre).

Par exemple :

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

// Then, configure your Snowflake environment
//
val sfOptions = Map(
  "sfURL" -> "<account_identifier>.snowflakecomputing.com",
  "sfUser" -> "<user_name>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database_name>",
  "sfSchema" -> "<schema_name>",
  "sfWarehouse" -> "<warehouse_name>",
  "sfCompress" -> "on",
  "sfSSL" -> "on",
  "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/",
  "temporary_azure_sas_token" -> "<azure_sas>"
)
Copy

Pour plus de détails sur les options prises en charge par sfOptions, voir Options Azure pour le transfert de données externe (dans ce chapitre).

Authentification Hadoop/Spark avec Azure

Pour utiliser cette méthode, modifiez les exemples Scala de ce chapitre pour ajouter les options de configuration Hadoop suivantes :

val hadoopConf = sc.hadoopConfiguration
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Copy

Assurez-vous que l’option tempdir utilise également wasb://.

L’authentification via un navigateur n’est pas prise en charge

Lorsque vous utilisez Spark Connector, il est peu pratique d’utiliser une forme d’authentification permettant d’afficher une fenêtre de navigateur pour demander à l’utilisateur des informations d’identification. La fenêtre n’apparaîtrait pas nécessairement sur l’ordinateur client. Par conséquent, Spark Connector ne prend en charge aucun type d’authentification, y compris MFA (authentification multifactorielle) ou SSO (authentification unique), qui appellerait une fenêtre du navigateur.