Utilisation du connecteur Spark

Le connecteur adhère à l’API Spark standard, mais avec deux options supplémentaires spécifiques à Snowflake décrites dans ce chapitre.

Dans ce chapitre, le terme COPY se réfère à :

  • COPY INTO <table> (utilisé pour transférer des données d’une zone de préparation interne ou externe vers une table).

  • COPY INTO <emplacement> (utilisé pour transférer des données d’une table vers une zone de préparation interne ou externe).

Dans ce chapitre :

Vérification de la connexion réseau à Snowflake avec SnowCD

Après avoir configuré votre pilote, vous pouvez évaluer et dépanner votre connectivité réseau à Snowflake en utilisant SnowCD.

Vous pouvez utiliser SnowCD pendant le processus de configuration initiale et à la demande à tout moment pour évaluer et dépanner votre connexion réseau à Snowflake.

Pushdown

Le connecteur Spark applique un pushdown de prédicat et de requête en capturant et en analysant les plans logiques Spark pour les opérations SQL. Lorsque la source de données est Snowflake, les opérations sont traduites en une requête SQL puis exécutées dans Snowflake pour améliorer les performances.

Cependant, comme cette traduction nécessite presque de traduire un par un les opérateurs SQL Spark en expressions Snowflake, tous les opérateurs Spark SQL ne peuvent pas être push down. Lorsque le pushdown échoue, le connecteur revient à un plan d’exécution moins optimisé. Les opérations non prises en charge sont à la place exécutées dans Spark.

Vous trouverez ci-dessous une liste des opérations prises en charge pour le pushdown (toutes les fonctions ci-dessous utilisent leurs noms Spark). Si une fonction ne figure pas dans cette liste, un plan Spark qui l’utilise peut être exécuté sur Spark plutôt que push down à Snowflake.

  • Fonctions d’agrégation

    • Average

    • CorrCovPopulation

    • CovSample

    • Count

    • Max

    • Min

    • StddevPop

    • StddevSamp

    • Sum

    • VariancePop

    • VarianceSamp

  • Opérateurs booléens

    • And

    • Between

    • Contains

    • EndsWith

    • EqualTo

    • GreaterThan

    • GreaterThanOrEqual

    • In

    • IsNull

    • IsNotNull

    • LessThan

    • LessThanOrEqual

    • Not

    • Or

    • StartsWith

  • Fonctions mathématiques

    • Opérateurs arithmétiques « + » (addition), « - » (soustraction), « * » (multiplication), « / » (division) et « - » (négation unaire).

    • Abs

    • Acos

    • Asin

    • Atan

    • Ceil

    • Cos

    • Cosh

    • Exp

    • Floor

    • Greatest

    • Least

    • Log

    • Pi

    • Pow

    • Rand

    • Round

    • Sin

    • Sinh

    • Sqrt

    • Tan

    • Tanh

  • Opérateurs divers

    • Alias (expressions AS)

    • BitwiseAnd

    • BitwiseNot

    • BitwiseOr

    • BitwiseXor

    • Cast(enfant, t, _)

    • DateAdd

    • If

    • MakeDecimal

    • ScalarSubquery

    • ShiftLeft

    • ShiftRight

    • SortOrder

    • UnscaledValue

  • Opérateurs relationnels

    • Fonctions d’agrégation et clauses group-by

    • Distinct

    • Filters

    • In

    • InSet

    • Joins

    • Limits

    • Projections

    • Sorts (ORDER BY)

    • Union and Union All

    • Fonctions de fenêtre et clauses de fenêtrage

  • Fonctions de chaîne de caractères

    • Ascii

    • Concat(enfants)

    • Like

    • Lower

    • StringLPad

    • StringRPad

    • StringTranslate

    • StringTrim

    • StringTrimLeft

    • StringTrimRight

    • Substring

    • Upper

  • Fonctions de fenêtre (remarque : elles ne fonctionnent pas avec Spark 2.2)

    • RowNumber

Utilisation du connecteur dans Scala

Spécification du nom de classe de source de données

Pour utiliser Snowflake comme source de données dans Spark, utilisez l’option .format pour fournir le nom de classe du connecteur Snowflake qui définit la source de données.

net.snowflake.spark.snowflake

Pour assurer une vérification du nom de la classe lors de la compilation, Snowflake recommande fortement de définir une variable pour le nom de classe. Par exemple :

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

De plus, par souci de facilité, la classe Utils fournit la variable qui peut être importée comme suit :

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME

Note

Tous les exemples de ce chapitre utilisent SNOWFLAKE_SOURCE_NAME comme définition de classe.

Activation/désactivation du pushdown dans une session

La version 2.1.0 (et supérieure) du connecteur prend en charge le pushdown de requêtes, ce qui peut améliorer considérablement les performances en poussant le traitement des requêtes vers Snowflake lorsque Snowflake est la source de données Spark.

Par défaut, le pushdown est activé.

Pour le désactiver dans une session Spark, après avoir instancié un objet SparkSession, utilisez l’appel de méthode statique suivant :

SnowflakeConnectorUtils.disablePushdownSession(spark)

spark est votre objet SparkSession.

Vous pouvez réactiver le pushdown à tout moment en utilisant la méthode suivante :

SnowflakeConnectorUtils.enablePushdownSession(spark)

Transfert de données depuis Snowflake vers Spark

Note

Lorsque vous utilisez des DataFrames, le connecteur Snowflake ne prend en charge que les requêtes SELECT.

Pour lire des données de Snowflake vers un DataFrame Spark :

  1. Utilisez la méthode read() de l’objet SqlContext pour construire un DataFrameReader.

  2. Spécifiez SNOWFLAKE_SOURCE_NAME en utilisant la méthode format(). Pour la définition, voir Spécification du nom de classe de source de données (dans ce chapitre).

  3. Spécifiez les options du connecteur en utilisant la méthode option() ou options(). Pour plus d’informations, voir Réglage des options de configuration du connecteur (dans ce chapitre).

  4. Spécifiez l’une des options suivantes pour les données de table à lire :

    • dbtable: le nom de la table à lire. Toutes les colonnes et tous les enregistrements sont récupérés (c’est-à-dire que cela est équivalent à SELECT * FROM table_bd).

    • query: la requête exacte (instruction SELECT) à exécuter.

Notes sur l’utilisation

  • Actuellement, le connecteur ne prend pas en charge d’autres types de requête (par ex. les instructions SHOW, DESC ou DML) en cas d’utilisation de DataFrames.

  • Il y a une limite supérieure à la taille d’une ligne individuelle. Pour plus de détails, voir Limites de la taille du texte de requête.

Remarques sur les performances

Lorsque vous transférez des données entre Snowflake et Spark, utilisez les méthodes suivantes pour analyser/améliorer les performances :

  • Utilisez la méthode net.snowflake.spark.snowflake.Utils.getLastSelect() pour voir la requête actuelle émise lors du transfert de données depuis Snowflake vers Spark.

  • Si vous utilisez la fonctionnalité filter ou where du DataFrame Spark, vérifiez que les filtres respectifs sont présents dans la requête SQL émise. Le connecteur Snowflake essaie de traduire tous les filtres demandés par Spark en SQL.

    Cependant, il existe des formes de filtre que l’infrastructure Spark ne transmet pas aujourd’hui au connecteur Snowflake. Par conséquent, dans certaines situations, un grand nombre d’enregistrements inutiles sont demandés à Snowflake.

  • Si vous n’avez besoin que d’un sous-ensemble de colonnes, assurez-vous qu’il reflète le sous-ensemble dans la requête SQL.

  • En général, si la requête SQL émise ne correspond pas à ce que vous attendez sur la base des opérations de DataFrame, utilisez l’option query pour fournir la syntaxe SQL exacte que vous souhaitez.

Exemples

Lire une table entière :

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

Lire les résultats d’une requête :

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()

Transfert de données de Spark à Snowflake

Les étapes pour enregistrer le contenu d’un DataFrame dans une table Snowflake sont similaires à celles servant à effectuer une écriture depuis Snowflake vers Spark :

  1. Utilisez la méthode write() du DataFrame pour construire un DataFrameWriter.

  2. Spécifiez SNOWFLAKE_SOURCE_NAME en utilisant la méthode format(). Pour la définition, voir Spécification du nom de classe de source de données (dans ce chapitre).

  3. Spécifiez les options du connecteur en utilisant la méthode option() ou options(). Pour plus d’informations, voir Réglage des options de configuration du connecteur (dans ce chapitre).

  4. Utilisez l’option dbtable pour spécifier la table dans laquelle les données sont écrites.

  5. Utilisez la méthode mode() pour spécifier le mode de sauvegarde du contenu.

    Pour plus d’informations, voir SaveMode (documentation Spark).

Exemples

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Exportation de JSON de Spark vers Snowflake

Les DataFrames Spark peuvent contenir des objets JSON sérialisés en tant que chaînes. Le code suivant fournit un exemple de conversion d’un DataFrame classique en DataFrame contenant des données JSON :

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)

Notez que le résultat jsonDataFrame contient une seule colonne de type StringType. Par conséquent, lorsque ce DataFrame est exporté vers Snowflake avec le mode commun SaveMode.Overwrite, une nouvelle table est créée dans Snowflake avec une seule colonne de type VARCHAR.

Pour charger jsonDataFrame dans une colonne VARIANT :

  1. Créez une table Snowflake (connectée à Snowflake dans Scala à l’aide du pilote JDBC Snowflake). Voir Configuration du pilote JDBC pour obtenir une description du paramètre de connexion du pilote JDBC Snowflake :

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
        String jdbcUrl = "jdbc:snowflake://xy12345.snowflakecomputing.com/";
    
        Properties properties = new Properties();
        properties.put("user", "peter");
        properties.put("password", "test");
        properties.put("account", "xy12345");
        properties.put("warehouse", "mywh");
        properties.put("db", "mydb");
        properties.put("schema", "public");
    
        // get connection
        System.out.println("Create JDBC connection");
        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
        System.out.println("Done creating JDBC connection\n");
        // create statement
        System.out.println("Create JDBC statement");
        Statement statement = connection.createStatement();
        System.out.println("Done creating JDBC statement\n");
        // create a table
        System.out.println("Create my_variant_table table");
        statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
        statement.close();
        System.out.println("Done creating demo table\n");
    
        connection.close();
        System.out.println("Close connection\n");
      }
    }
    
  2. Au lieu d’utiliser SaveMode.Overwrite, utilisez SaveMode.Append pour réutiliser la table existante. Lorsque la valeur de chaîne représentant JSON est chargée dans Snowflake car la colonne cible est de type VARIANT, elle est analysée comme JSON. Par exemple :

    df.write
        .format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "my_variant_table")
        .mode(SaveMode.Append)
        .save()
    

Exécution d’instructions DDL/DML SQL

Utilisez la méthode runQuery() de l’objet Utils pour exécuter les instructions DDL/DML SQL, en plus des requêtes, par exemple :

var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
    )
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")

sfOptions est la carte des paramètres utilisée pour lire/écrire des DataFrames.

La méthode runQuery ne renvoie que TRUE ou FALSE. Elle est destinée aux instructions qui ne renvoient pas de jeu de résultats, par exemple les instructions DDL telles que CREATE TABLE et DML telles que INSERT, UPDATE et DELETE. Cela n’est pas utile pour les instructions qui renvoient un jeu de résultats, tel que SELECT ou SHOW.

Utilisation d’horodatages et de fuseaux horaires

Spark ne fournit qu’un seul type d’horodatage équivalent au type d’horodatage Scala/Java. Son comportement est presque identique à celui du type de données TIMESTAMP_LTZ (fuseau horaire local) dans Snowflake. Ainsi, lorsque vous transférez des données entre Spark et Snowflake, nous vous recommandons d’utiliser les approches suivantes pour conserver l’heure correctement, par rapport aux fuseaux horaires :

  • Utilisez uniquement le type de données TIMESTAMP_LTZ dans Snowflake.

    Note

    Le mapping par défaut de type de données d’horodatage est TIMESTAMP_NTZ (aucun fuseau horaire). Vous devez donc définir explicitement le paramètre TIMESTAMP_TYPE_MAPPING pour utiliser TIMESTAMP_LTZ.

  • Réglez le fuseau horaire Spark sur UTC et utilisez ce fuseau horaire dans Snowflake (c.-à-d. ne réglez pas l’option sfTimezone pour le connecteur, et ne définissez pas explicitement un fuseau horaire dans Snowflake). Dans ce scénario, TIMESTAMP_LTZ et TIMESTAMP_NTZ sont effectivement équivalents.

    Pour régler le fuseau horaire, ajoutez la ligne suivante à votre code Spark :

    java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
    

Si vous ne mettez pas en œuvre l’une de ces méthodes, des modifications non souhaitées peuvent survenir. Par exemple, considérons le scénario suivant :

  • Le fuseau horaire dans Spark est réglé sur America/New_York.

  • Le fuseau horaire dans Snowflake est réglé sur Europe/Warsaw, ce qui peut arriver :

    • En réglant sfTimezone sur Europe/Warsaw pour le connecteur.

    • En réglant sfTimezone sur snowflake pour le connecteur et en définissant le paramètre de session TIMEZONE dans Snowflake sur Europe/Warsaw.

  • TIMESTAMP_NTZ et TIMESTAMP_LTZ sont tous deux utilisés dans Snowflake.

Dans ce scénario :

  1. Si une valeur représentant 12:00:00 dans une colonne TIMESTAMP_NTZ dans Snowflake est envoyée à Spark, cette valeur ne contient aucune information de fuseau horaire. Spark traite la valeur comme 12:00:00 à New York.

  2. Si Spark renvoie cette valeur 12:00:00 (à New York) à Snowflake pour être chargée dans une colonne TIMESTAMP_LTZ , elle est automatiquement convertie et chargée comme 18:00:00 (pour le fuseau horaire de Varsovie).

  3. Si cette valeur est ensuite convertie en TIMESTAMP_NTZ dans Snowflake, l’utilisateur voit 18:00:00, qui est différent de la valeur originale, 12:00:00.

Pour résumer, Snowflake recommande de suivre strictement au moins une de ces règles :

  • Utilisez le même fuseau horaire, de préférence UTC pour Snowflake et Spark.

  • Utilisez seulement le type de données TIMESTAMP_LTZ pour transférer des données entre Spark et Snowflake.

Exemple de programme Scala

Important

Cet exemple de programme suppose que vous utilisez la version 2.2.0 (ou supérieure) du connecteur qui utilise une zone de préparation interne Snowflake pour stocker des données temporaires, et qui n’a donc pas besoin d’un emplacement S3 pour stocker des données temporaires. Si vous utilisez une version antérieure, vous devez avoir un emplacement S3 existant et inclure des valeurs pour tempdir, awsAccessKey, awsSecretKey pour sfOptions. Pour plus de détails, voir Options AWS pour le transfert de données externe (dans ce chapitre).

Le programme Scala suivant fournit un cas d’utilisation complet pour le connecteur Snowflake pour Spark. Avant d’utiliser le code, remplacez les chaînes suivantes par les valeurs correspondantes, comme décrit dans Réglage des options de configuration du connecteur (dans ce chapitre) :

  • <nom_compte>: nom de votre compte (fourni par Snowflake).

  • <nom_utilisateur>, <mot de passe>: identifiants de connexion pour l’utilisateur Snowflake.

  • <base_de_données>, <schéma>, <entrepôt>: valeurs par défaut de la session Snowflake.

L’exemple de programme Scala utilise l’authentification de base (c’est-à-dire le nom d’utilisateur et le mot de passe). Si vous souhaitez vous authentifier avec OAuth, consultez Utilisation de l’OAuth externe (dans ce chapitre).

import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) from t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Utilisation du connecteur avec Python

L’utilisation du connecteur avec Python est très similaire à l’utilisation de Scala.

Nous recommandons d’utiliser le script bin/pyspark inclus dans la distribution Spark.

Configuration du script pyspark

Le script pyspark doit être configuré de la même manière que le script spark-shell , en utilisant les options --packages ou --jars. Par exemple :

bin/pyspark --packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4

N’oubliez pas d’inclure les fichiers .jar du connecteur Snowflake Spark et du connecteur JDBC dans votre variable d’environnement CLASSPATH.

Pour plus d’informations sur la configuration du script spark-shell , voir Étape 4 : Configuration du cluster local Spark ou de l’environnement Spark hébergé sur Amazon EMR.

Activation/désactivation du pushdown dans une session

La version 2.1.0 (et supérieure) du connecteur prend en charge le pushdown de requêtes, ce qui peut améliorer considérablement les performances en poussant le traitement des requêtes vers Snowflake lorsque Snowflake est la source de données Spark.

Par défaut, le pushdown n’est pas activé. Pour l’activer dans une session Spark, après avoir instancié un objet SparkSession, utilisez l’appel de méthode statique suivant :

SnowflakeConnectorUtils.enablePushdownSession()

Par exemple :

sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

Dans cet exemple, sc est votre objet SparkSession.

Vous pouvez également le désactiver à tout moment en utilisant la méthode disablePushdownSession(). Par exemple :

sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

Exemple de script Python

Important

Cet exemple de script suppose que vous utilisez la version 2.2.0 (ou supérieure) du connecteur qui utilise une zone de préparation interne Snowflake pour stocker des données temporaires, et qui n’a donc pas besoin d’un emplacement S3 pour stocker ces données. Si vous utilisez une version antérieure, vous devez avoir un emplacement S3 existant et inclure des valeurs pour tempdir, awsAccessKey, awsSecretKey pour sfOptions. Pour plus de détails, voir Options AWS pour le transfert de données externe (dans ce chapitre).

Une fois le script pyspark configuré, vous pouvez effectuer des requêtes SQL et d’autres opérations. Voici un exemple de script Python qui effectue une simple requête SQL. Ce script illustre l’utilisation de base des connecteurs. La plupart des exemples Scala de ce document peuvent être adaptés avec un minimum d’efforts/de modifications pour une utilisation avec Python.

L’exemple de script Python utilise l’authentification de base (c’est-à-dire le nom d’utilisateur et le mot de passe). Si vous souhaitez vous authentifier avec OAuth, consultez Utilisation de l’OAuth externe (dans ce chapitre).

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<YOUR_APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<YOUR_AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<YOUR_AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_name>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

Astuce

Notez l’utilisation de sfOptions et SNOWFLAKE_SOURCE_NAME. Cela simplifie le code et réduit les risques d’erreur.

Pour plus de détails sur les options prises en charge pour sfOptions, voir Réglage des options de configuration du connecteur (dans ce chapitre).

Mappages de type de données

Le connecteur Spark prend en charge la conversion entre de nombreux types de données classiques.

De SQL Spark vers Snowflake

Type de données Spark

Type de données Snowflake

ArrayType

VARIANT

BinaryType

Non pris en charge

BooleanType

BOOLEAN

ByteType

INTEGER. Snowflake ne prend pas en charge le type BYTE.

DateType

DATE

DecimalType

DECIMAL

DoubleType

DOUBLE

FloatType

FLOAT

IntegerType

INTEGER

LongType

INTEGER

MapType

VARIANT

ShortType

INTEGER

StringType

Si une longueur est spécifiée, VARCHAR(N) ; autrement, VARCHAR

StructType

VARIANT

TimeType

Non pris en charge

TimestampType

TIMESTAMP

De Snowflake vers SQL Spark.

Type de données Snowflake

Type de données Spark

ARRAY

StringType

BIGINT

DecimalType(38, 0)

BINARY

Non pris en charge

BLOB

Non pris en charge

BOOLEAN

BooleanType

CHAR

StringType

CLOB

StringType

DATE

DateType

DECIMAL

DecimalType

DOUBLE

DoubleType

FLOAT

DoubleType

INTEGER

DecimalType(38, 0)

OBJECT

StringType

TIMESTAMP

TimestampType

TIME

StringType (Version du connecteur Spark 2.4.14 ou ultérieure)

VARIANT

StringType

Réglage des options de configuration du connecteur

Les options suivantes configurent le comportement du connecteur. Elles peuvent être spécifiées en utilisant .option(<clé>, <valeur>) ou .options(<map>) pour une classe Spark DataframeReader.

Astuce

Pour faciliter l’utilisation des options, Snowflake vous recommande de les enregistrer dans une seule variable Map et d’utiliser l”.options() API.

Options de connexion requises

Les options suivantes sont requises pour la connexion à Snowflake :

sfUrl

Spécifie le nom d’hôte de votre compte au format suivant :

nom_compte.snowflakecomputing.com

Cependant, notez que votre nom de compte complet peut inclure des segments supplémentaires identifiant la région et la plate-forme Cloud où votre compte est hébergé.

Exemples de noms de compte par région

Si votre nom de compte est xy12345 :

Plate-forme Cloud/Région

Nom de compte complet

AWS

US Ouest (Oregon)

xy12345

US Est (Ohio)

xy12345.us-east-2.aws

US Est (Virginie du Nord)

xy12345.us-east-1

US Est (Gouvernement commercial - Virginie du Nord)

xy12345.us-east-1-gov.aws

Canada (Centre)

xy12345.ca-central-1.aws

EU (Irlande)

xy12345.eu-west-1

EU (Francfort)

xy12345.eu-central-1

Asie-Pacifique (Tokyo)

xy12345.ap-northeast-1.aws

Asie Pacifique (Mumbai)

xy12345.ap-south-1.aws

Asie-Pacifique (Singapour)

xy12345.ap-southeast-1

Asie-Pacifique (Sydney)

xy12345.ap-southeast-2

GCP

US Central1 (Iowa)

xy12345.us-central1.gcp

Europe Ouest2 (Londres)

xy12345.europe-west2.gcp

Europe Ouest4 (Pays-Bas)

xy12345.europe-west4.gcp

Azure

Ouest US 2 (Washington)

xy12345.west-us-2.azure

Est US 2 (Virginie)

xy12345.east-us-2.azure

US Gov Virginia

xy12345.us-gov-virginia.azure

Canada Central (Toronto)

xy12345.canada-central.azure

Europe de l’Ouest (Pays-Bas)

xy12345.west-europe.azure

Suisse Nord (Zurich)

xy12345.switzerland-north.azure

Asie du Sud-Est (Singapour)

xy12345.southeast-asia.azure

Australie Est (Nouvelle-Galles du Sud)

xy12345.australia-east.azure

Important

Si l’une des conditions suivantes est remplie, le nom de votre compte est différent de la structure décrite dans cet exemple :

  • Si votre édition Snowflake est VPS, contactez le support Snowflake pour obtenir des détails sur le nom de votre compte.

  • Si AWS PrivateLink est activé pour votre compte, le nom de votre compte nécessite un segment privatelink supplémentaire. Pour plus de détails, voir AWS PrivateLink et Snowflake.

sfUser

Nom de connexion de l’utilisateur Snowflake.

sfPassword

Mot de passe de l’utilisateur Snowflake.

Notez que vous devez utiliser l’une des options suivantes pour vous authentifier : sfPassword, pem_private_key ou sfAuthenticator.

pem_private_key

Clé privée (au format PEM) pour l’authentification par paire de clés. Pour obtenir des instructions, voir Utilisation de l’authentification par paire de clés dans cette rubrique.

Notez que vous devez utiliser l’une des options suivantes pour vous authentifier : sfPassword, pem_private_key ou sfAuthenticator.

sfAuthenticator

Spécifie l’utilisation de OAuth externe pour s’authentifier auprès de Snowflake. Définissez la valeur sur oauth.

Notez que vous devez utiliser l’une des options suivantes pour vous authentifier : sfPassword, pem_private_key ou sfAuthenticator.

L’utilisation de l’OAuth externe nécessite de définir le paramètre sfToken.

sfToken

Définissez la valeur sur votre jeton d’accès OAuth externe.

Le paramètre de connexion nécessite de définir la valeur du paramètre sfAuthenticator sur oauth.

La valeur par défaut est aucun.

Options de contexte requises

Les options suivantes sont nécessaires pour définir le contexte de base de données et de schéma de la session :

sfDatabase

La base de données à utiliser pour la session après la connexion.

sfSchema

Le schéma à utiliser pour la session après la connexion.

Options supplémentaires

Toutes les autres options ne sont pas nécessaires :

sfAccount

Nom du compte (p. ex. xy12345). Cette option n’est plus nécessaire car le nom du compte est spécifié dans sfUrl. Elle n’est documentée ici que pour des raisons de rétrocompatibilité.

sfWarehouse

L’entrepôt virtuel par défaut à utiliser pour la session après la connexion.

sfRole

Le rôle de sécurité par défaut à utiliser pour la session après la connexion.

sfTimezone

Le fuseau horaire à utiliser par Snowflake en cas d’utilisation de Spark. Notez que le paramètre définit uniquement le fuseau horaire dans Snowflake. L’environnement Spark reste inchangé. Les valeurs prises en charge sont les suivantes :

  • spark: utilisez le fuseau horaire de Spark (par défaut).

  • snowflake: utilisez le fuseau horaire actuel pour Snowflake.

  • sf_default: utilisez le fuseau horaire par défaut pour l’utilisateur Snowflake qui se connecte.

  • fuseau_horaire : utilisez un fuseau horaire spécifique (par ex. America/New_York), si valide.

    Pour plus d’informations sur les conséquences de cette option, voir Utilisation d’horodatages et de fuseaux horaires (dans ce chapitre).

sfCompress

Si l’option est réglée sur on (par défaut), les données transmises entre Snowflake et Spark sont compressées.

s3MaxFileSize

La taille du fichier utilisé lors du transfert de données depuis Snowflake vers Spark. La valeur par défaut est 10MB.

parallelism

La taille du pool de threads à utiliser pour les téléchargements de données entre Snowflake et Spark. La valeur par défaut est 4.

En général, cette valeur n’a pas besoin d’être modifiée, sauf si vous avez un besoin spécifique d’augmenter ou de diminuer le débit. Le parallélisme dans les applications Spark se gère de préférence via des partitions et des exécuteurs. En outre, le degré de parallélisme ne doit pas être fixé à un nombre arbitrairement élevé pour produire un débit élevé. Cela peut avoir des effets négatifs et involontaires, dont potentiellement ralentir les téléchargements.

Exemple :

df.write
.format(SNOWFLAKE_SOURCE_NAME)
.option("parallelism", "8")
.mode(SaveMode.Overwrite)
.save()
preactions

Une liste de commandes SQL séparées par des points-virgules qui sont exécutées avant le transfert des données entre Spark et Snowflake.

Si une commande SQL contient %s, %s est remplacée par le nom de la table référencée pour l’opération.

postactions

Une liste de commandes SQL séparées par des points-virgules qui sont exécutées après le transfert des données entre Spark et Snowflake.

Si une commande SQL contient %s, elle est remplacée par le nom de la table référencée pour l’opération.

truncate_columns

Si l’option est définie sur on (par défaut), une commande COPY tronque automatiquement les chaînes de texte dépassant la longueur de la colonne cible. Si l’option est définie sur off, la commande produit une erreur si une chaîne chargée dépasse la longueur de la colonne cible.

truncate_table

Ce paramètre contrôle si le schéma d’une table cible Snowflake est conservé lorsque la table est écrasée.

Par défaut, lorsqu’une table cible dans Snowflake est écrasée, le schéma de cette table cible est également écrasé. Le nouveau schéma est basé sur le schéma de la table source (le dataframe Spark).

Cependant, le schéma de la source n’est parfois pas idéal. Par exemple, un utilisateur peut vouloir qu’une table cible Snowflake puisse stocker des valeurs FLOAT à l’avenir, même si le type de données de la colonne source initiale est INTEGER. Dans ce cas, le schéma de la table Snowflake ne doit pas être écrasé. La table Snowflake doit simplement être tronquée et réutilisée avec son schéma actuel.

Les valeurs possibles de ce paramètre sont :

  • on

  • off

Si ce paramètre est on, le schéma original de la table cible est conservé. Si ce paramètre est off, alors l’ancien schéma de la table est ignoré, et un nouveau schéma est généré sur la base du schéma de la source.

Ce paramètre est facultatif.

La valeur par défaut de ce paramètre est off (c’est-à-dire que, par défaut, le schéma de la table originale est écrasé).

Pour plus de détails sur le mappage de types de données Spark à des types de données Snowflake (et vice versa), voir Mappages de type de données (dans ce chapitre).

continue_on_error

Cette variable contrôle si la commande COPY abandonne si l’utilisateur saisit des données non valides (par exemple, format JSON invalide pour une colonne de type de données variable).

Les valeurs possibles sont :

  • on

  • off

La valeur on signifie continuer même si une erreur se produit. La valeur off signifie annuler si une erreur est détectée.

Ce paramètre est facultatif.

La valeur par défaut de ce paramètre est off.

Il n’est pas recommandé d’activer cette option. Si des erreurs sont détectées lors du processus de COPYing vers Snowflake à l’aide du connecteur Spark, alors certaines données peuvent être absentes.

Note

Si des lignes sont rejetées ou absents, et que ces lignes ne sont pas clairement défectueuses dans la source d’entrée, veuillez le signaler à Snowflake.

usestagingtable

Ce paramètre contrôle si le chargement de données utilise une table de mise en zone de préparation.

Une table d’échelonnement est une table normale (avec un nom temporaire) qui est créée par le connecteur. Si l’opération de chargement des données est réussie, la table cible originale est détruite et la table d’échelonnement est renommée au nom de la table cible d’origine. Si l’opération de chargement des données échoue, la table d’échelonnement est détruite et la table cible se retrouve avec les données qu’elle avait immédiatement avant l’opération. Ainsi, la table d’échelonnement permet de conserver les données de la table cible d’origine en cas d’échec de l’opération. Pour des raisons de sécurité, Snowflake recommande fortement l’utilisation d’une table d’échelonnement dans la plupart des cas.

Pour que le connecteur puisse créer une table d’échelonnement, l’utilisateur exécutant le COPY via le connecteur Spark doit avoir suffisamment de privilèges pour créer une table. Le chargement direct (c’est-à-dire le chargement sans utiliser de table d’échelonnement) est utile si l’utilisateur n’a pas la permission de créer une table.

Les valeurs possibles de ce paramètre sont :

  • on

  • off

Si le paramètre est on, une table d’échelonnement est utilisée. Si ce paramètre est off, les données sont chargées directement dans la table cible.

Ce paramètre est facultatif.

La valeur par défaut de ce paramètre est on (c.-à-d., utiliser une table de mise en zone de préparation).

autopushdown

Ce paramètre contrôle si le pushdown automatique de requêtes est activé.

Si le pushdown est activé, alors, lorsqu’une requête est exécutée sur Spark, si une partie de la requête peut être « poussée vers le bas » vers le serveur Snowflake, elle est alors poussée vers le bas. Ceci améliore les performances de certaines requêtes.

Ce paramètre est facultatif.

La valeur par défaut est on si le connecteur est connecté à une version compatible de Spark. Sinon, la valeur par défaut est off.

Si le connecteur est connecté à une version différente de Spark que celle à laquelle il est destiné (par exemple si la version 2.3 du connecteur est connectée à la version 2.2 de Spark), l’auto-pushdown est désactivé même si ce paramètre est réglé sur on.

purge

Si cette option est définie sur on, le connecteur supprime les fichiers temporaires créés lors du transfert depuis Spark vers Snowflake via un transfert de données externe. Si ce paramètre est réglé sur off, ces fichiers ne sont pas automatiquement supprimés par le connecteur.

La purge ne fonctionne que pour les transferts de Spark vers Snowflake, et non pour les transferts de Snowflake vers Spark.

Les valeurs possibles sont :

  • on

  • off

La valeur par défaut est off.

columnmap

Ce paramètre est utile lorsque vous écrivez des données de Spark dans Snowflake et que les noms de colonne de la table Snowflake ne correspondent pas à ceux de la table Spark. Vous pouvez créer une carte qui indique quelle colonne source Spark correspond à quelle colonne cible Snowflake.

Le paramètre est un littéral de chaîne unique, sous la forme suivante :

"Map(col_2 -> col_b, col_3 -> col_a)"

Par exemple, considérons le scénario suivant :

  • Supposons qu’un Dataframe nommé df dans Spark ait trois colonnes :

    col_1 , col_2 , col_3

  • Une table nommée tb dans Snowflake a deux colonnes :

    col_a , col_b

  • Vous souhaitez copier les valeurs suivantes :

    • De df.col_2 à tb.col_b.

    • De df.col_3 à tb.col_a.

La valeur du paramètre columnmap serait :

Map(col_2 -> col_b, col_3 -> col_a)

Vous pouvez générer cette valeur en exécutant le code Scala suivant :

Map("col_2"->"col_b","col_3"->"col_a").toString()

La valeur par défaut de ce paramètre est nulle. En d’autres termes, par défaut, les noms de colonne des tables source et cible doivent correspondre.

Ce paramètre n’est utilisé que pour effectuer une écriture depuis Spark vers Snowflake. Il ne s’applique pas pour une écriture depuis Snowflake vers Spark.

keep_column_case

Lors de l’écriture d’une table de Spark dans Snowflake, le connecteur Spark décale par défaut les lettres des noms de colonnes en majuscules, à moins que les noms de colonnes ne soient entre guillemets.

Lors de l’écriture d’une table de Snowflake dans Spark, le connecteur Spark ajoute par défaut des guillemets autour de tout nom de colonne contenant des caractères, à l’exception des lettres majuscules, des caractères de soulignement et des chiffres.

Si vous définissez keep_column_case sur on, le connecteur Spark n’effectuera pas ces modifications.

Les valeurs possibles sont :

  • on

  • off

La valeur par défaut est off.

column_mapping

Le connecteur doit mapper les colonnes du cadre de données Spark à la table Snowflake. Cela peut être effectué en fonction des noms de colonne (quel que soit leur ordre) ou de leur ordre (c’est-à-dire que la première colonne du cadre de données est mappée avec la première colonne de la table, quel que soit le nom de la colonne).

Par défaut, le mappage est effectué en fonction de l’ordre. Vous pouvez remplacer ce paramètre en définissant ce paramètre sur name, qui indique au connecteur de mapper les colonnes en fonction de leurs noms. (La correspondance de noms est insensible à la casse.)

Les valeurs possibles de ce paramètre sont :

  • order

  • name

La valeur par défaut est order.

column_mismatch_behavior

Ce paramètre ne s’applique que lorsque le paramètre column_mapping est défini sur name.

Si les noms de colonne dans le cadre de données Spark et la table de Snowflake ne correspondent pas, alors :

  • Si column_mismatch_behavior est error, Spark Connector signale une erreur.

  • Si column_mismatch_behavior est ignore, Spark Connector ignore l’erreur.

    • Le pilote supprime toute colonne du cadre de données Spark qui n’a pas de colonne correspondante dans la table Snowflake.

    • Le pilote insère NULLs dans n’importe quelle colonne de la table Snowflake qui n’a pas de colonne correspondante dans le cadre de données Spark.

Les erreurs potentielles incluent :

  • Le cadre de données Spark peut contenir des colonnes identiques sauf pour les majuscules/minuscules. Étant donné que le mappage des noms de colonne ne respecte pas la casse, il n’est pas possible de déterminer le mappage correct du cadre de données à la table.

  • La table Snowflake peut contenir des colonnes identiques sauf pour les majuscules/minuscules. Étant donné que le mappage des noms de colonne ne respecte pas la casse, il n’est pas possible de déterminer le mappage correct du cadre de données à la table.

  • Le cadre de données Spark et la table Snowflake peuvent ne pas avoir de noms de colonnes en commun. En théorie, le connecteur Spark pourrait insérer NULLs dans chaque colonne de chaque ligne, mais cela n’a généralement pas de sens. Le connecteur renvoie donc une erreur même si column_mismatch_behavior est défini sur ignore.

Les valeurs possibles de ce paramètre sont :

  • error

  • ignore

La valeur par défaut est error.

time_output_format

Ce paramètre permet à l’utilisateur de spécifier le format des données TIME renvoyées.

Les valeurs possibles de ce paramètre sont les valeurs possibles pour les formats d’heure spécifiés à Formats d’heure.

Ce paramètre affecte uniquement la sortie, pas l’entrée.

partition_size_in_mb

Ce paramètre est utilisé lorsque l’ensemble de résultats de la requête est très volumineux et doit être divisé en plusieurs partitions DataFrame. Ce paramètre spécifie la taille non compressée recommandée pour chaque partition DataFrame. Pour réduire le nombre de partitions, augmentez cette taille.

Cette taille est utilisée comme taille recommandée, la taille réelle des partitions peut être plus petite ou plus grande.

Cette option s’applique uniquement lorsque le paramètre use_copy_unload est FALSE.

Ce paramètre est facultatif.

La valeur par défaut est 100 (MB).

use_copy_unload

Si la valeur est FALSE, Snowflake utilise le format de données Arrow lors de la [sélection] SELECTing de données. Si la valeur est TRUE, Snowflake revient à l’ancien comportement consistant à utiliser la commande COPY UNLOAD pour transmettre les données sélectionnées.

Ce paramètre est facultatif.

La valeur par défaut est FALSE.

Utilisation de l’authentification par paire de clés

Snowflake utilise l’authentification par paire de clés plutôt que l’authentification par nom d’utilisateur/mot de passe typique. Cette méthode d’authentification nécessite une paire de clés de 2048 bits (minimum) RSA. Générez la paire de clés publiques-privées via OpenSSL. La clé publique est attribuée à l’utilisateur Snowflake qui utilisera le client Snowflake.

Pour configurer la paire de clés publiques/privées :

  1. Depuis la ligne de commande d’une fenêtre de terminal, générez une clé privée.

    Vous pouvez générer une version chiffrée de la clé privée ou une version non chiffrée de la clé privée.

    Pour générer une version non chiffrée, utilisez la commande suivante :

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
    

    Pour générer une version chiffrée, utilisez la commande suivante (qui omet « -nocrypt ») :

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
    

    Il est généralement plus sûr de générer une version chiffrée.

    Si vous utilisez la deuxième commande pour chiffrer la clé privée, OpenSSL vous invite à indiquer une phrase secrète utilisée pour chiffrer le fichier de clé privée. Nous vous recommandons d’utiliser une phrase de chiffrement forte pour protéger la clé privée. Enregistrez cette phrase secrète dans un emplacement sécurisé. Vous l’entrerez lorsque vous vous connecterez à Snowflake. Notez que la phrase de chiffrement n’est utilisée que pour protéger la clé privée et ne sera jamais envoyée à Snowflake.

    Exemple de clé privée PEM

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. Depuis la ligne de commande, générez la clé publique en faisant référence à la clé privée :

    En supposant que la clé privée soit contenue dans le fichier nommé « rsa_key.p8 », utilisez la commande suivante :

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    Exemple de clé publique PEM

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. Copiez les fichiers de clés publiques et privées dans un répertoire local en vue de leur stockage. Enregistrez le chemin d’accès aux fichiers. Notez que la clé privée est stockée au format PKCS#8 (Public Key Cryptography Standards) et est chiffrée à l’aide de la phrase de chiffrement que vous avez spécifiée à l’étape précédente ; toutefois, le fichier doit toujours être protégé contre tout accès non autorisé au moyen du mécanisme d’autorisation de fichier fourni par votre système d’exploitation. Il est de votre responsabilité de sécuriser le fichier lorsqu’il n’est pas utilisé.

  4. Attribuez la clé publique à l’utilisateur Snowflake en utilisant ALTER USER. Par exemple :

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    Note

    • Seuls les administrateurs de sécurité (c’est-à-dire les utilisateurs ayant le rôle SECURITYADMIN) ou ayant un rôle supérieur peuvent modifier un utilisateur.

    • Excluez l’en-tête et le pied de page de la clé publique dans l’instruction SQL.

    Vérifiez l’empreinte de la clé publique de l’utilisateur en utilisant DESCRIBE USER :

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    Note

    La propriété RSA_PUBLIC_KEY_2_FP est décrite dans Rotation de clé (dans ce chapitre).

  5. Envoyez une copie non chiffrée de la clé privée à l’aide de l’option de connexion pem_private_key.

Attention

Pour des raisons de sécurité, plutôt que de coder en dur la pem_private_key dans votre application, vous devez définir le paramètre de manière dynamique après avoir lu la clé à partir d’une source sécurisée. Si la clé est chiffrée, déchiffrez-la et envoyez la version déchiffrée.

Dans l’exemple Python, notez que le fichier pem_private_key, rsa_key.p8, est :

  • Est directement lu à partir d’un fichier protégé par mot de passe, à l’aide de la variable d’environnement PRIVATE_KEY_PASSPHRASE.

  • Utilise l’expression pkb dans la chaîne sfOptions.

Pour vous connecter, vous pouvez enregistrer l’exemple Python dans un fichier (par ex. : <file.py>) puis exécuter la commande suivante :

spark-submit --packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4 <file.py>

Python

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os

with open("<path>/rsa_key.p8", "rb") as key_file:
  p_key = serialization.load_pem_private_key(
    key_file.read(),
    password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
    backend = default_backend()
    )

pkb = p_key.private_bytes(
  encoding = serialization.Encoding.PEM,
  format = serialization.PrivateFormat.PKCS8,
  encryption_algorithm = serialization.NoEncryption()
  )

pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')

sfOptions = {
  "sfURL" : "<account_name>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "pem_private_key" : pkb,
  "sfDatabase" : "<database>",
  "sfSchema" : "schema",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "COLORS") \
    .load()

df.show()

Rotation de clé

Snowflake accepte plusieurs clés actives pour permettre une rotation ininterrompue. Faites pivoter et remplacez vos clés publiques et privées en fonction du calendrier d’expiration que vous suivez en interne.

Actuellement, vous pouvez utiliser les paramètres RSA_PUBLIC_KEY et RSA_PUBLIC_KEY_2 pour ALTER USER afin d’associer jusqu’à 2 clés publiques à un seul utilisateur.

Pour faire tourner vos clés :

  1. Effectuez les étapes de la section Utilisation de l’authentification par paire de clés pour :

    • Générer un nouvel ensemble de clés privées et publiques.

    • Attribuer la clé publique à l’utilisateur. Définir la valeur de la clé publique sur RSA_PUBLIC_KEY ou RSA_PUBLIC_KEY_2 (la valeur de la clé qui n’est pas actuellement utilisée). Par exemple :

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Mettez à jour le code pour vous connecter à Snowflake. Spécifiez la nouvelle clé privée.

    Snowflake vérifie la bonne clé publique active pour l’authentification sur la base de la clé privée soumise avec vos informations de connexion.

  3. Retirez l’ancienne clé publique du profil utilisateur. Par exemple :

    alter user jsmith unset rsa_public_key;
    

Utilisation de l’OAuth externe

À partir de la version du connecteur Spark 2.7.0, vous pouvez utiliser OAuth externe pour vous authentifier auprès de Snowflake à l’aide de l’exemple de programme Scala ou de l’exemple de script Python.

Avant d’utiliser OAuth externe et le connecteur Spark pour vous authentifier auprès de Snowflake, configurez une intégration de sécurité OAuth externe pour l’un des serveurs d’autorisation OAuth externe pris en charge ou un client personnalisé OAuth externe.

Dans les exemples Scala et Python, notez le remplacement du paramètre sfPassword par les paramètres sfAuthenticator et sfToken.

Scala :

// spark connector version

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.DataFrame

var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<username>",
    "sfAuthenticator" -> "oauth",
    "sfToken" -> "<external_oauth_access_token>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "region")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Python :

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<YOUR_APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<YOUR_AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<YOUR_AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_name>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfAuthenticator" : "oauth",
  "sfToken" : "<external_oauth_access_token>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

Options AWS pour le transfert de données externe

Ces options sont utilisées pour spécifier l’emplacement Amazon S3 où les données temporaires sont stockées, et fournissent des détails d’authentification pour accéder à cet emplacement. Ils sont uniquement requis si vous effectuez un transfert de données externe. Les transferts de données externes sont nécessaires si l’une des affirmations suivantes est vraie :

  • Vous utilisez la version 2.1.x ou une version inférieure du connecteur Spark (qui ne prend pas en charge les transferts internes).

  • Votre transfert prendra probablement 36 heures ou plus (les transferts internes utilisent des informations d’authentification temporaires qui expirent après 36 heures).

tempDir

L’emplacement S3 où sont stockées les données préparées (par exemple s3n://xy12345-bucket/spark-snowflake-tmp/).

Si tempDir est spécifié, vous devez également spécifier :

  • awsAccessKey , awsSecretKey . ou

  • temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

awsAccessKey , awsSecretKey

Il s’agit d’informations d’authentification AWS standard qui permettent d’accéder à l’emplacement spécifié dans tempDir. Notez que ces deux options doivent être définies ensemble.

Si elles sont définies, elles peuvent être récupérées depuis un objet SparkContext existant.

Si vous spécifiez ces variables, vous devez également spécifier tempDir.

Ces informations d’authentification doivent également être définies pour le cluster Hadoop.

temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

Il s’agit d’informations d’authentification AWS temporaires qui permettent d’accéder à l’emplacement spécifié dans tempDir. Notez que ces trois options doivent être définies ensemble.

De plus, si ces options sont définies, elles ont priorité sur les options awsAccessKey et awsSecretKey.

Si vous spécifiez temporary_aws_access_key_id, temporary_aws_secret_access_key et temporary_aws_session_token, vous devez également spécifier tempDir. Sinon, ces paramètres sont ignorés.

check_bucket_configuration

Si le paramètre est réglé sur on (par défaut), le connecteur vérifie si le compartiment utilisé pour le transfert de données possède une politique de cycle de vie configurée (voir Préparation d’un compartiment S3 AWS externe pour plus d’informations). S’il n’y a aucune politique de cycle de vie, un avertissement est enregistré.

La désactivation de cette option (en la réglant sur off) ignore cette vérification. Ceci peut être utile si un utilisateur peut accéder aux opérations de données de compartiment, mais pas aux politiques de cycle de vie du compartiment. La désactivation de cette option peut également accélérer légèrement les temps d’exécution des requêtes.

Pour plus de détails, voir Authentification S3 pour l’échange de données (dans ce chapitre).

Options Azure pour le transfert de données externe

Cette section décrit les paramètres qui s’appliquent au stockage Azure Blob lors de transferts de données externes. Les transferts de données externes sont nécessaires si l’une des affirmations suivantes est vraie :

  • Vous utilisez la version 2.1.x ou une version inférieure du connecteur Spark (qui ne prend pas en charge les transferts internes).

  • Votre transfert prendra probablement 36 heures ou plus (les transferts internes utilisent des informations d’authentification temporaires qui expirent après 36 heures).

Lorsque vous utilisez un transfert externe avec le stockage Azure Blob, vous spécifiez l’emplacement du conteneur Azure et le SAS (signature d’accès partagé) pour ce conteneur en utilisant les paramètres décrits ci-dessous.

tempDir

Le conteneur de stockage Azure Blob où sont stockées les données intermédiaires. Cela se présente sous la forme d’une URL, par exemple :

wasb://<conteneur_azure>@<compte_azure>.<point_de_terminaison_azure>/

temporary_azure_sas_token

Spécifiez le jeton SAS pour le stockage Azure Blob.

Pour plus de détails, voir Authentification Azure pour l’échange de données (dans ce chapitre).

Spécification des informations Azure pour le stockage temporaire dans Spark

Lorsque vous utilisez Azure Blob Storage pour fournir un stockage temporaire afin de transférer des données entre Spark et Snowflake, vous devez fournir à Spark, ainsi qu’au connecteur Snowflake Spark, l’emplacement et les informations d’identification du stockage temporaire.

Pour fournir l’emplacement de stockage temporaire à Spark, exécutez les commandes similaires à celles décrites ci-dessous sur votre cluster Spark :

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

Notez que la dernière commande contient les variables suivantes :

  • <conteneur> et <compte> : il s’agit du conteneur et du nom du compte pour votre déploiement Azure.

  • <point_de_terminaison_azure> : il s’agit du point de terminaison de votre emplacement de déploiement Azure. Par exemple, si vous utilisez un déploiement US Azure, le point de terminaison est probablement blob.core.windows.net.

  • <azure_sas> : il s’agit du jeton de sécurité Shared Access Signature.

Remplacez chacune de ces variables par les informations appropriées pour votre compte de stockage Azure Blob.

Transmission de paramètres de session Snowflake en tant qu’options pour le connecteur

Le connecteur Snowflake pour Spark prend en charge l’envoi de paramètres de session arbitraires à Snowflake (voir Paramètres de session pour plus d’informations). Ceci peut être réalisé en ajoutant une paire ("<clé>" -> "<valeur>") à l’objet options, où <clé> est le nom de paramètre de session et <valeur> est la valeur.

Note

La <valeur> devrait être une chaîne entourée de guillemets doubles, même pour les paramètres qui acceptent les nombres ou les valeurs booléennes (par exemple "1" ou "true").

Par exemple, l’exemple de code suivant transmet le paramètre de session USE_CACHED_RESULT avec une valeur de "false" qui désactive l’utilisation des résultats des requêtes exécutées précédemment :

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method

Remarques relatives à la sécurité

Les clients doivent s’assurer que dans un système Spark à plusieurs nœuds, les communications entre les nœuds sont sécurisées. Le maître Spark envoie les informations d’identification Snowflake aux employés Spark afin que ces derniers puissent accéder aux zones de préparation Snowflake. Si les communications entre le maître Spark et les employés Spark ne sont pas sécurisées, les informations d’identification peuvent être lues par un tiers non autorisé.

Authentification S3 pour l’échange de données

Cette section décrit comment s’authentifier en cas d’utilisation de S3 pour échanger des données.

Cette tâche est seulement nécessaire dans l’une des circonstances suivantes :

  • La version du connecteur Snowflake pour Spark est 2.1.x (ou inférieure). Depuis la version 2.2.0, le connecteur utilise une zone de préparation interne temporaire Snowflake pour les échanges de données. Si vous n’utilisez pas actuellement la version 2.2.0 (ou supérieure) du connecteur, Snowflake recommande fortement la mise à niveau vers la dernière version.

  • La version du connecteur Snowflake pour Spark est 2.2.0 (ou supérieure), mais vos opérations dépassent généralement les 36 heures. C’est la durée maximale du jeton AWS utilisé par le connecteur pour accéder à la zone de préparation interne pour l’échange de données.

Si vous utilisez une ancienne version du connecteur, vous devez préparer un emplacement S3 que le connecteur peut utiliser pour échanger des données entre Snowflake et Spark.

Pour permettre l’accès au compartiment/répertoire S3 utilisé pour échanger des données entre Spark et Snowflake (comme spécifié pour tempDir), deux méthodes d’authentification sont prises en charge :

  • Informations d’authentification AWS permanentes (également utilisées pour configurer l’authentification Hadoop/Spark pour accéder à S3).

  • Informations d’authentification AWS temporaires

Utilisation d’informations d’authentification AWS permanentes

Il s’agit de la méthode d’authentification AWS standard. Elle nécessite une paire de valeurs awsAccessKey et awsSecretKey.

Note

Ces valeurs doivent également être utilisées pour configurer Hadoop/Spark pour accéder à S3. Pour plus d’informations, y compris des exemples, voir Authentification Hadoop/Spark à l’aide de S3A ou S3N (dans ce chapitre).

Par exemple :

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)

Pour plus de détails sur les options prises en charge par sfOptions, voir Options AWS pour le transfert de données externe (dans ce chapitre).

Authentification Hadoop/Spark à l’aide de S3A ou S3N

Les écosystèmes Hadoop/Spark prennent en charge 2 schémas URI pour accéder à S3 :

s3a://

Nouvelle méthode recommandée (pour Hadoop 2.7 et supérieur)

Pour utiliser cette méthode, modifiez les exemples Scala de ce chapitre pour ajouter les options de configuration Hadoop suivantes :

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)

Assurez-vous que l’option tempdir utilise également s3a://.

s3n://

Ancienne méthode (pour Hadoop 2.6 et antérieur)

Dans certains systèmes, il est nécessaire de la spécifier explicitement comme le montre l’exemple Scala suivant :

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)

Utilisation d’informations d’authentification AWS temporaires

Cette méthode utilise les options de configuration temporary_aws_access_key_id, temporary_aws_secret_access_key et temporary_aws_session_token pour le connecteur.

Cette méthode permet une sécurité supplémentaire en fournissant à Snowflake un accès temporaire au compartiment/répertoire S3 utilisé pour l’échange de données.

Note

Les informations d’authentification temporaires ne peuvent être utilisées que pour configurer l’authentification S3 pour le connecteur. Elles ne peuvent être utilisées pour configurer l’authentification Hadoop/Spark.

De plus, si vous fournissez des informations d’authentification temporaires, elles ont la priorité sur toutes les informations d’authentification permanentes fournies.

L’exemple de code Scala suivant fournit un exemple d’authentification à l’aide d’identifiants temporaires :

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())

sfOptions2 peut maintenant être utilisé avec la méthode options() DataFrame.

Authentification Azure pour l’échange de données

Cette section décrit comment s’authentifier en cas d’utilisation du stockage Azure Blob pour échanger des données.

S’authentifier de cette façon est uniquement nécessaire dans l’une des circonstances suivantes :

  • La version du connecteur Snowflake pour Spark est 2.1.x (ou inférieure). Depuis la version 2.2.0, le connecteur utilise une zone de préparation interne temporaire Snowflake pour les échanges de données. Si vous n’utilisez pas actuellement la version 2.2.0 (ou supérieure) du connecteur, Snowflake recommande fortement la mise à niveau vers la dernière version.

  • La version du connecteur Snowflake pour Spark est 2.2.0 (ou supérieure), mais vos opérations dépassent généralement les 36 heures. C’est la durée maximale du jeton Azure utilisé par le connecteur pour accéder à la zone de préparation interne pour l’échange de données.

Vous devez préparer un conteneur de stockage Azure Blob que le connecteur peut utiliser pour échanger des données entre Snowflake et Spark.

Utilisation d’informations d’authentification Azure

Il s’agit de la méthode d’authentification standard du stockage Azure Blob. Elle nécessite une paire de valeurs : tempDir (une URL) and des valeurs temporary_azure_sas_token.

Note

Ces valeurs doivent également être utilisées pour configurer Hadoop/Spark pour accéder au stockage Azure Blob. Pour plus d’informations, y compris des exemples, voir Authentification Hadoop/Spark avec Azure (dans ce chapitre).

Par exemple :

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

// Then, configure your Snowflake environment
//
val sfOptions = Map(
  "sfURL" -> "<account_name>.snowflakecomputing.com",
  "sfUser" -> "<user_name>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database_name>",
  "sfSchema" -> "<schema_name>",
  "sfWarehouse" -> "<warehouse_name>",
  "sfCompress" -> "on",
  "sfSSL" -> "on",
  "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/",
  "temporary_azure_sas_token" -> "<azure_sas>"
)

Pour plus de détails sur les options prises en charge par sfOptions, voir Options Azure pour le transfert de données externe (dans ce chapitre).

Authentification Hadoop/Spark avec Azure

Pour utiliser cette méthode, modifiez les exemples Scala de ce chapitre pour ajouter les options de configuration Hadoop suivantes :

val hadoopConf = sc.hadoopConfiguration
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

Assurez-vous que l’option tempdir utilise également wasb://.

L’authentification via un navigateur n’est pas prise en charge

Lorsque vous utilisez Spark Connector, il est peu pratique d’utiliser une forme d’authentification permettant d’afficher une fenêtre de navigateur pour demander à l’utilisateur des informations d’identification. La fenêtre n’apparaîtrait pas nécessairement sur l’ordinateur client. Par conséquent, Spark Connector ne prend en charge aucun type d’authentification, y compris MFA (authentification multifactorielle) ou SSO (authentification unique), qui appellerait une fenêtre du navigateur.