Verwenden des Spark-Konnektors

Der Konnektor entspricht der Standard-API von Spark, bietet jedoch zusätzliche Snowflake-spezifische Optionen, die unter diesem Thema beschrieben werden.

COPY bezieht sich unter diesem Thema auf:

  • COPY INTO <Tabelle> (zur Übertragung von Daten aus einem internen oder externen Stagingbereich in eine Tabelle).

  • COPY INTO <Speicherort> (zur Übertragung von Daten aus einer Tabelle in einen internen oder externen Stagingbereich).

Unter diesem Thema:

Überprüfen der Netzwerkverbindung zu Snowflake mit SnowCD

Nach der Konfiguration des Treibers können Sie die Netzwerkkonnektivität zu Snowflake mit SnowCD testen und Probleme beheben.

Sie können während der Erstkonfiguration und bei Bedarf jederzeit SnowCD verwenden, um Ihre Netzwerkverbindung zu Snowflake zu testen und Probleme zu beheben.

Pushdown

Der Spark-Konnektor wendet Prädikat- und Abfrage-Pushdown an, indem die logischen Pläne von Spark für SQL-Operationen erfasst und analysiert werden. Wenn die Datenquelle Snowflake ist, werden die Operationen in eine SQL-Abfrage übersetzt und dann in Snowflake ausgeführt, um die Leistung zu verbessern.

Da für diese Übersetzung jedoch eine nahezu Eins-zu-Eins-Übersetzung der Spark-SQL-Operatoren in Snowflake-Ausdrücke erforderlich ist, können nicht alle Spark-SQL-Operatoren für Pushdown verwendet werden. Wenn Pushdown fehlschlägt, greift der Konnektor auf einen weniger optimierten Ausführungsplan zurück. Die nicht unterstützten Operationen werden stattdessen in Spark ausgeführt.

Nachfolgend finden Sie eine Liste der unterstützten Pushdown-Operationen (für alle unten aufgeführten Funktionen werden deren Spark-Namen verwendet). Wenn eine Funktion nicht in dieser Liste enthalten ist, wird möglicherweise ein Spark-Plan, der diese Funktion verwendet, auf Spark ausgeführt, anstatt den Pushdown zu Snowflake durchzuführen.

  • Aggregatfunktionen

    • Average

    • CorrCovPopulation

    • CovSample

    • Count

    • Max

    • Min

    • StddevPop

    • StddevSamp

    • Sum

    • VariancePop

    • VarianceSamp

  • Boolesche Operatoren

    • And

    • Between

    • Contains

    • EndsWith

    • EqualTo

    • GreaterThan

    • GreaterThanOrEqual

    • In

    • IsNull

    • IsNotNull

    • LessThan

    • LessThanOrEqual

    • Not

    • Or

    • StartsWith

  • Mathematische Funktionen

    • Arithmetische Operatoren „+“ (Addition), „-“ (Subtraktion), „*“ (Multiplikation), „/“ (Division) und „-“ (unäre Negation).

    • Abs

    • Acos

    • Asin

    • Atan

    • Ceil

    • Cos

    • Cosh

    • Exp

    • Floor

    • Greatest

    • Least

    • Log

    • Pi

    • Pow

    • Rand

    • Round

    • Sin

    • Sinh

    • Sqrt

    • Tan

    • Tanh

  • Verschiedene Operatoren

    • Alias (AS-Ausdrücke)

    • BitwiseAnd

    • BitwiseNot

    • BitwiseOr

    • BitwiseXor

    • Cast(Unterelement, t, _)

    • DateAdd

    • If

    • MakeDecimal

    • ScalarSubquery

    • ShiftLeft

    • ShiftRight

    • SortOrder

    • UnscaledValue

  • Relationale Operatoren

    • Aggregatfunktionen und Group-By-Klauseln

    • Distinct

    • Filter

    • In

    • InSet

    • Joins

    • Einschränkungen

    • Projektionen

    • Sortierungen (ORDER BY)

    • Union und Union All

    • Fensterfunktionen und Fensterklauseln

  • Zeichenfolgenfunktionen

    • Ascii

    • Concat(Unterelemente)

    • Like

    • Lower

    • StringLPad

    • StringRPad

    • StringTranslate

    • StringTrim

    • StringTrimLeft

    • StringTrimRight

    • Teilzeichenfolge

    • Upper

  • Fensterfunktionen (Hinweis: Diese funktionieren nicht mit Spark 2.2)

    • RowNumber

Verwenden des Konnektors in Scala

Angeben des Namens der Datenquellenklasse

Damit Snowflake als Datenquelle in Spark verwendet werden kann, geben Sie mit der Option .format den Namen der Snowflake-Konnektorklasse an, die die Datenquelle definiert.

net.snowflake.spark.snowflake

Um beim Kompilieren eine Prüfung des Klassennamens sicherzustellen, empfiehlt Snowflake dringend, eine Variable für den Klassennamen zu definieren. Beispiel:

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

Außerdem stellt die Klasse Utils die Variable zur Verfügung, die wie folgt importiert werden kann:

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME

Bemerkung

Alle Beispiele unter diesem Thema verwenden SNOWFLAKE_SOURCE_NAME als Klassendefinition.

Aktivieren/Deaktivieren von Pushdown in einer Sitzung

Version 2.1.0 (und höher) des Konnektors unterstützt die Abfrageverschiebung (Pushdown), wodurch die Performance deutlich gesteigert wird. Dabei wird die Verarbeitung von Abfragen an Snowflake übergeben, wenn Snowflake die Spark-Datenquelle ist.

Standardmäßig ist Pushdown aktiviert.

Um es innerhalb einer Spark-Sitzung zu deaktivieren, rufen Sie nach der Instanziierung eines SparkSession-Objekts die folgende statische Methode auf:

SnowflakeConnectorUtils.disablePushdownSession(spark)

Dabei ist spark Ihr SparkSession-Objekt.

Sie können Pushdown jederzeit wieder aktivieren, indem Sie die folgende Methode aufrufen:

SnowflakeConnectorUtils.enablePushdownSession(spark)

Verschieben von Daten von Snowflake zu Spark

Bemerkung

Wenn Sie DataFrames verwenden, unterstützt der Snowflake-Konnektor nur SELECT-Abfragen.

So lesen Sie Daten von Snowflake in einen Spark-DataFrame ein:

  1. Verwenden Sie die read()-Methode des SqlContext-Objekts, um einen DataFrameReader zu erstellen.

  2. Geben Sie SNOWFLAKE_SOURCE_NAME mit der Methode format() an. Weitere Informationen zur Definition finden Sie unter Angeben des Namens der Datenquellenklasse (unter diesem Thema).

  3. Geben Sie die Konnektoroptionen mit der Methode option() oder options() an. Weitere Informationen dazu finden Sie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema).

  4. Geben Sie eine der folgenden Optionen für die zu lesenden Tabellendaten an:

    • dbtable: Der Name der zu lesenden Tabelle. Alle Spalten und Datensätze werden abgerufen (d. h. es ist gleichbedeutend mit SELECT * FROM Datenbanktabelle).

    • query: Die genaue Abfrage (SELECT-Anweisung), die ausgeführt werden soll.

Nutzungshinweise

  • Derzeit unterstützt der Konnektor bei Verwendung von DataFrames keine anderen Typen von Abfragen (wie SHOW, DESC oder DML-Anweisungen).

  • Es gibt eine Obergrenze für die Größe einer einzelnen Zeile. Weitere Details dazu finden Sie unter Begrenzung der Abfragetextgröße.

Hinweise zur Performance

Wenn Sie Daten zwischen Snowflake und Spark übertragen, verwenden Sie die folgenden Methoden, um die Performance zu analysieren bzw. zu verbessern:

  • Verwenden Sie die net.snowflake.spark.snowflake.Utils.getLastSelect()-Methode, um die aktuelle Abfrage anzuzeigen, die beim Verschieben von Daten von Snowflake nach Spark ausgegeben wird.

  • Wenn Sie die Funktionalität filter oder where des Spark-DataFrame verwenden, überprüfen Sie, ob in der ausgeführten SQL-Abfrage die entsprechenden Filter vorhanden sind. Der Snowflake-Konnektor versucht, alle von Spark angeforderten Filter in SQL zu übersetzen.

    Es gibt jedoch Formen von Filtern, die die Spark-Infrastruktur derzeit nicht an den Snowflake-Konnektor übergibt. Infolgedessen wird in einigen Situationen eine große Anzahl unnötiger Datensätze von Snowflake angefordert.

  • Wenn Sie nur eine Teilmenge von Spalten benötigen, stellen Sie sicher, dass die Teilmenge in der SQL-Abfrage angegeben ist.

  • Im Allgemeinen, wenn die ausgegebene SQL-Abfrage nicht mit dem übereinstimmt, was Sie aufgrund der DataFrame-Operationen erwarten, verwenden Sie die Option query, um genau die gewünschte SQL-Syntax bereitzustellen.

Beispiele

Lesen einer vollständigen Tabelle:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

Lesen der Ergebnisse einer Abfrage:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()

Verschieben von Daten von Spark in Snowflake

Die Schritte zum Speichern des Inhalts eines DataFrame in eine Snowflake-Tabelle sind ähnlich dem Schreiben von Snowflake in Spark:

  1. Verwenden Sie die write()-Methode des DataFrame, um einen DataFrameWriter zu erstellen.

  2. Geben Sie SNOWFLAKE_SOURCE_NAME mit der Methode format() an. Weitere Informationen zur Definition finden Sie unter Angeben des Namens der Datenquellenklasse (unter diesem Thema).

  3. Geben Sie die Konnektoroptionen mit der Methode option() oder options() an. Weitere Informationen dazu finden Sie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema).

  4. Verwenden Sie die Option dbtable, um die Tabelle anzugeben, in die die Daten geschrieben werden sollen.

  5. Verwenden Sie die Methode mode(), um den Speichermodus für den Inhalt festzulegen.

    Weitere Informationen dazu finden Sie unter SaveMode (Spark-Dokumentation).

Beispiele

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Exportieren von JSON-Daten von Spark in Snowflake

Spark-DataFrames können JSON-Objekte enthalten, die als Zeichenfolgen serialisiert sind. Der folgende Code zeigt ein Beispiel für die Konvertierung eines regulären DataFrame in einen DataFrame, der JSON-Daten enthält:

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)

Beachten Sie, dass der resultierende jsonDataFrame eine einzelne Spalte vom Typ StringType ist. Wenn dieser DataFrame mit dem üblichen SaveMode.Overwrite-Modus in Snowflake exportiert wird, wird eine neue Tabelle in Snowflake mit einer einzelnen Spalte vom Typ VARCHAR erstellt.

So laden Sie einen jsonDataFrame in eine VARIANT-Spalte:

  1. Erstellen Sie eine Snowflake-Tabelle (Verbinden mit Snowflake in Scala mithilfe des Snowflake-JDBC-Treibers). Eine Beschreibung der Verbindungsparameter des Snowflake-JDBC-Treibers finden Sie unter Konfigurieren des JDBC-Treibers:

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
        String jdbcUrl = "jdbc:snowflake://xy12345.snowflakecomputing.com/";
    
        Properties properties = new Properties();
        properties.put("user", "peter");
        properties.put("password", "test");
        properties.put("account", "xy12345");
        properties.put("warehouse", "mywh");
        properties.put("db", "mydb");
        properties.put("schema", "public");
    
        // get connection
        System.out.println("Create JDBC connection");
        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
        System.out.println("Done creating JDBC connection\n");
        // create statement
        System.out.println("Create JDBC statement");
        Statement statement = connection.createStatement();
        System.out.println("Done creating JDBC statement\n");
        // create a table
        System.out.println("Create my_variant_table table");
        statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
        statement.close();
        System.out.println("Done creating demo table\n");
    
        connection.close();
        System.out.println("Close connection\n");
      }
    }
    
  2. Um die vorhandene Tabelle wiederzuverwenden, verwenden Sie SaveMode.Append anstelle von SaveMode.Overwrite. Wenn der Zeichenfolgenwert für JSON in Snowflake geladen wird, weil die Zielspalte vom Typ VARIANT ist, wird er als JSON analysiert. Beispiel:

    df.write
        .format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "my_variant_table")
        .mode(SaveMode.Append)
        .save()
    

Ausführen von DDL/DML-SQL-Anweisungen

Verwenden Sie die runQuery()-Methode des Utils-Objekts, um zusätzlich zu Abfragen DDL/DML-SQL-Anweisungen auszuführen, z. B.:

var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
    )
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")

wobei sfOptions die Parameterzuordnung ist, die zum Lesen/Schreiben von DataFrames verwendet wird.

Die Methode runQuery gibt nur TRUE oder FALSE zurück. Sie ist für Anweisungen gedacht, die kein Resultset zurückgeben, z. B. DDL-Anweisungen wie CREATE TABLE und DML-Anweisungen wie INSERT, UPDATE und DELETE. Das ist hilft nicht bei Anweisungen, die ein Resultset zurückgeben, z. B. SELECT oder SHOW.

Arbeiten mit Zeitstempeln und Zeitzonen

Spark bietet nur einen Typ von Zeitstempel, der dem Scala/Java-Zeitstempeltyp entspricht. Dieser ist im Verhalten nahezu identisch mit dem Datentyp TIMESTAMP_LTZ (lokale Zeitzone) von Snowflake. Daher empfiehlt Snowflake beim Übertragen von Daten zwischen Spark und Snowflake die folgenden Ansätze, um die Zeit im Verhältnis zu den Zeitzonen korrekt zu erhalten:

  • Verwenden Sie in Snowflake nur den Datentyp TIMESTAMP_LTZ.

    Bemerkung

    Der standardmäßig zugeordnete Datentyp für den Zeitstempel ist TIMESTAMP_NTZ (keine Zeitzone). Daher müssen Sie ausdrücklich den Parameter TIMESTAMP_TYPE_MAPPING festlegen, um TIMESTAMP_LTZ verwenden zu können.

  • Stellen Sie die Spark-Zeitzone auf UTC, und verwenden Sie diese Zeitzone in Snowflake (d. h. setzen Sie nicht die Option sfTimezone für den Konnektor, und setzen Sie nicht explizit eine Zeitzone in Snowflake). In diesem Szenario sind TIMESTAMP_LTZ und TIMESTAMP_NTZ praktisch gleichwertig.

    Zum Einstellen der Zeitzone fügen Sie Ihrem Spark-Code die folgende Zeile hinzu:

    java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
    

Wenn Sie keinen dieser Ansätze implementieren, können unerwünschte Zeitänderungen auftreten. Betrachten Sie beispielsweise das folgende Szenario:

  • Die Zeitzone in Spark ist auf America/New_York eingestellt.

  • Die Zeitzone in Snowflake ist mit einer der beiden folgenden Methoden auf Europe/Warsaw eingestellt:

    • Setzen Sie sfTimezone für den Konnektor auf Europe/Warsaw.

    • Setzen Sie sfTimezone für den Konnektor auf snowflake und den Sitzungsparameter TIMEZONE in Snowflake auf Europe/Warsaw.

  • Sowohl TIMESTAMP_NTZ als auch TIMESTAMP_LTZ werden in Snowflake verwendet.

In diesem Szenario gilt:

  1. Wenn ein Wert, der 12:00:00 in einer TIMESTAMP_NTZ-Spalte in Snowflake repräsentiert, an Spark gesendet wird, enthält dieser Wert keine Zeitzoneninformationen. Spark behandelt den Wert als 12:00:00 in New York.

  2. Wenn Spark diesen Wert 12:00:00 (in New York) zurück an Snowflake sendet, um ihn in eine TIMESTAMP_LTZ-Spalte zu laden, wird er automatisch umgewandelt und als 18:00:00 (für die Zeitzone Warschau) geladen.

  3. Wenn dieser Wert dann in Snowflake in TIMESTAMP_NTZ umgewandelt wird, wird dem Benutzer 18:00:00 angezeigt, was sich vom ursprünglichen Wert 12:00:00 unterscheidet.

Zusammenfassend empfiehlt Snowflake, mindestens eine der folgenden Regeln strikt einzuhalten:

  • Verwenden Sie die gleiche Zeitzone, idealerweise UTC, sowohl für Spark als auch für Snowflake.

  • Verwenden Sie nur den Datentyp TIMESTAMP_LTZ für die Datenübertragung zwischen Spark und Snowflake.

Scala-Beispielprogramm

Wichtig

Dieses Beispielprogramm geht davon aus, dass Sie die Version 2.2.0 (oder höher) des Konnektors verwenden, der einen internen Snowflake-Stagingbereich zum Speichern temporärer Daten verwendet und daher keinen S3-Speicherort zum Speichern temporärer Daten benötigt. Wenn Sie eine frühere Version verwenden, müssen Sie über einen vorhandenen S3-Speicherort verfügen und Werte für tempdir, awsAccessKey, awsSecretKey für sfOptions angeben. Weitere Details dazu finden Sie unter AWS-Optionen für externe Datenübertragung (unter diesem Thema).

Das folgende Scala-Programm bietet einen vollständigen Anwendungsfall für den Snowflake-Konnektor für Spark. Ersetzen Sie vor der Verwendung des Codes die folgenden Zeichenfolgen durch die entsprechenden Werte, wie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema) beschrieben:

  • <Kontoname>: Name Ihres Kontos (bereitgestellt von Snowflake).

  • <Benutzername>, <Kennwort>: Anmeldeinformationen für den Snowflake-Benutzer.

  • <Datenbank>, <Schema>, <Warehouse>: Standardwerte für die Snowflake-Sitzung.

Das Scala-Beispielprogramm verwendet die Basisauthentifizierung (d. h. Benutzername und Kennwort). Wenn Sie sich mit OAuth authentifizieren möchten, finden Sie die entsprechenden Informationen unter Verwenden von External OAuth (unter diesem Thema).

import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) from t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Verwenden des Konnektors mit Python

Die Verwendung des Konnektors mit Python ist der Verwendung von Scala sehr ähnlich.

Wir empfehlen Ihnen, das in der Spark-Distribution enthaltene bin/pyspark-Skript zu verwenden.

Konfigurieren des pyspark-Skripts

Das pyspark-Skript muss ähnlich wie das spark-shell-Skript konfiguriert werden, wobei die Optionen --packages oder --jars verwendet werden. Beispiel:

bin/pyspark --packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4

Vergessen Sie nicht, den Snowflake-Konnektor für Spark und die .jar-Dateien des JDBC-Konnektors in die CLASSPATH-Umgebungsvariable aufzunehmen.

Weitere Informationen zur Konfiguration des spark-shell-Skripts finden Sie unter Schritt 4: Lokales Spark-Cluster oder die von Amazon EMR gehostete Spark-Umgebung konfigurieren.

Aktivieren/Deaktivieren von Pushdown in einer Sitzung

Version 2.1.0 (und höher) des Konnektors unterstützt die Abfrageverschiebung (Pushdown), wodurch die Performance deutlich gesteigert wird. Dabei wird die Verarbeitung von Abfragen an Snowflake übergeben, wenn Snowflake die Spark-Datenquelle ist.

Standardmäßig ist Pushdown nicht aktiviert. Um es innerhalb einer Spark-Sitzung zu aktivieren, starten Sie nach der Instanziierung eines SparkSession-Objekts den folgenden statischen Methodenaufruf:

SnowflakeConnectorUtils.enablePushdownSession()

Beispiel:

sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

In diesem Beispiel ist sc Ihr SparkSession-Objekt.

Sie können es auch jederzeit deaktivieren, indem Sie die Methode disablePushdownSession() aufrufen. Beispiel:

sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

Beispiel für ein Python-Skript

Wichtig

Dieses Beispielskript geht davon aus, dass Sie die Version 2.2.0 (oder höher) des Konnektors verwenden, der einen internen Snowflake-Stagingbereich zum Speichern temporärer Daten verwendet und daher keinen S3-Speicherort zum Speichern dieser Daten benötigt. Wenn Sie eine frühere Version verwenden, müssen Sie über einen vorhandenen S3-Speicherort verfügen und Werte für tempdir, awsAccessKey, awsSecretKey für sfOptions angeben. Weitere Details dazu finden Sie unter AWS-Optionen für externe Datenübertragung (unter diesem Thema).

Nachdem das pyspark-Skript konfiguriert wurde, können Sie SQL-Abfragen und andere Operationen ausführen. Hier ist ein Beispiel für ein Python-Skript, das eine einfache SQL-Abfrage ausführt. Dieses Skript veranschaulicht die grundlegende Verwendung des Konnektors. Die meisten Scala-Beispiele in diesem Dokument können mit minimalem Aufwand für die Verwendung mit Python angepasst werden.

Das Python-Beispielskript verwendet die Basisauthentifizierung (d. h. Benutzername und Kennwort). Wenn Sie sich mit OAuth authentifizieren möchten, finden Sie die entsprechenden Informationen unter Verwenden von External OAuth (unter diesem Thema).

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<YOUR_APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<YOUR_AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<YOUR_AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_name>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

Tipp

Beachten Sie die Verwendung von sfOptions und SNOWFLAKE_SOURCE_NAME. Dies vereinfacht den Code und reduziert die Wahrscheinlichkeit von Fehlern.

Details zu den unterstützten Optionen für sfOptions finden Sie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema).

Zuordnen von Datentypen

Der Spark-Konnektor unterstützt die Konvertierung zwischen vielen gängigen Datentypen.

Von Spark SQL zu Snowflake

Spark-Datentyp

Snowflake-Datentyp

ArrayType

VARIANT

BinaryType

Nicht unterstützt

BooleanType

BOOLEAN

ByteType

INTEGER. Snowflake unterstützt den Typ BYTE nicht.

DateType

DATE

DecimalType

DECIMAL

DoubleType

DOUBLE

FloatType

FLOAT

IntegerType

INTEGER

LongType

INTEGER

MapType

VARIANT

ShortType

INTEGER

StringType

Wenn Länge angegeben ist, VARCHAR(N); andernfalls VARCHAR

StructType

VARIANT

TimeType

Nicht unterstützt

TimestampType

TIMESTAMP

Von Snowflake zu Spark SQL

Snowflake-Datentyp

Spark-Datentyp

ARRAY

StringType

BIGINT

DecimalType(38, 0)

BINARY

Nicht unterstützt

BLOB

Nicht unterstützt

BOOLEAN

BooleanType

CHAR

StringType

CLOB

StringType

DATE

DateType

DECIMAL

DecimalType

DOUBLE

DoubleType

FLOAT

DoubleType

INTEGER

DecimalType(38, 0)

OBJECT

StringType

TIMESTAMP

TimestampType

TIME

StringType (Spark-Konnektor-Version 2.4.14 oder höher)

VARIANT

StringType

Einstellen der Konfigurationsoptionen für den Konnektor

Mit den folgenden Optionen kann das Verhalten des Konnektors konfiguriert werden. Sie können mit .option(<Schlüssel>, <Wert>) oder .options(<Zuordnung>) für eine Spark-DataframeReader-Klasse angegeben werden.

Tipp

Um die Verwendung der Optionen zu vereinfachen, empfiehlt Snowflake, sie in einer einzigen Map-Variablen zu speichern und die .options()-API zu verwenden.

Erforderliche Verbindungsoptionen

Die folgenden Optionen sind für die Verbindung mit Snowflake erforderlich:

sfUrl

Gibt den Hostnamen für Ihr Konto im folgenden Format an:

Kontoname.snowflakecomputing.com

Beachten Sie jedoch, dass Ihr vollständiger Kontoname möglicherweise zusätzliche Segmente enthält, die die Region und die Cloudplattform angeben, wo Ihr Konto gehostet wird.

Beispiele für Kontonamen nach Region

Wenn Ihr Kontoname beispielsweise xy12345 ist:

Cloudplattform/Region

Vollständiger Kontoname

AWS

US West (Oregon)

xy12345

US East (Ohio)

xy12345.us-east-2.aws

US East (N. Virginia)

xy12345.us-east-1

US East (Commercial Gov - N. Virginia)

xy12345.us-east-1-gov.aws

Canada (Central)

xy12345.ca-central-1.aws

EU (Irland)

xy12345.eu-west-1

EU (Frankfurt)

xy12345.eu-central-1

Asia Pacific (Tokio)

xy12345.ap-northeast-1.aws

Asia Pacific (Mumbai)

xy12345.ap-south-1.aws

Asia Pacific (Singapur)

xy12345.ap-southeast-1

Asia Pacific (Sydney)

xy12345.ap-southeast-2

GCP

US Central1 (Iowa)

xy12345.us-central1.gcp

Europe West2 (London)

xy12345.europe-west2.gcp

Europe West4 (Niederlande)

xy12345.europe-west4.gcp

Azure

West US 2 (Washington)

xy12345.west-us-2.azure

East US 2 (Virginia)

xy12345.east-us-2.azure

US Gov Virginia

xy12345.us-gov-virginia.azure

Canada Central (Toronto)

xy12345.canada-central.azure

West Europe (Niederlande)

xy12345.west-europe.azure

Switzerland North (Zürich)

xy12345.switzerland-north.azure

Southeast Asia (Singapur)

xy12345.southeast-asia.azure

Australia East (New South Wales)

xy12345.australia-east.azure

Wichtig

Wenn eine der folgenden Bedingungen zutrifft, unterscheidet sich Ihr Kontoname von der oben beschriebenen Struktur:

  • Wenn Ihre Snowflake Edition VPS ist, wenden Sie sich an den Snowflake-Support, um Details zu Ihrem Kontonamen zu erhalten.

  • Wenn für Ihr Konto AWS PrivateLink aktiviert ist, muss der Kontoname ein zusätzliches privatelink-Segment enthalten. Weitere Details dazu finden Sie unter AWS PrivateLink & Snowflake.

sfUser

Anmeldename für den Snowflake-Benutzer.

sfPassword

Kennwort für den Snowflake-Benutzer.

Beachten Sie, dass Sie zur Authentifizierung eine der folgenden Optionen verwenden müssen: sfPassword, pem_private_key oder sfAuthenticator.

pem_private_key

Privater Schlüssel (im PEM-Format) für die Schlüsselpaar-Authentifizierung. Eine Anleitung hierzu finden Sie unter Verwenden der Schlüsselpaar-Authentifizierung (unter diesem Thema).

Beachten Sie, dass Sie zur Authentifizierung eine der folgenden Optionen verwenden müssen: sfPassword, pem_private_key oder sfAuthenticator.

sfAuthenticator

Gibt an, dass External OAuth zur Authentifizierung bei Snowflake verwendet wird. Setzen Sie den Wert auf oauth.

Beachten Sie, dass Sie zur Authentifizierung eine der folgenden Optionen verwenden müssen: sfPassword, pem_private_key oder sfAuthenticator.

Für die Verwendung von External OAuth muss der Parameter sfToken festgelegt werden.

sfToken

Setzen Sie den Wert auf Ihr External OAuth-Zugriffstoken.

Für diesen Verbindungsparameter muss der Parameterwert sfAuthenticator auf oauth gesetzt werden.

Der Standardwert ist „none“.

Erforderliche Kontextoptionen

Die folgenden Optionen sind erforderlich, um den Datenbank- und Schemakontext für die Sitzung festzulegen:

sfDatabase

Die Datenbank, die nach dem Verbinden für die Sitzung verwendet werden soll.

sfSchema

Das Schema, das nach dem Verbinden für die Sitzung verwendet werden soll.

Zusätzliche Optionen

Alle anderen Optionen sind nicht erforderlich:

sfAccount

Kontoname (z. B. xy12345). Diese Option ist nicht mehr erforderlich, da der Kontoname in sfUrl angegeben ist. Sie ist hier nur aus Gründen der Abwärtskompatibilität dokumentiert.

sfWarehouse

Das standardmäßige virtuelle Warehouse, das nach dem Verbinden für die Sitzung verwendet werden soll.

sfRole

Die Standardsicherheitsrolle, die nach dem Verbinden für die Sitzung verwendet werden soll.

sfTimezone

Die Zeitzone, die Snowflake bei der Verwendung von Spark verwenden soll. Beachten Sie, dass der Parameter nur die Zeitzone in Snowflake festlegt. Die Spark-Umgebung bleibt unverändert. Folgende Werte werden unterstützt:

  • spark: Verwendet die Zeitzone von Spark (Standard).

  • snowflake: Verwendet die aktuelle Zeitzone für Snowflake.

  • sf_default: Verwendet die Standardzeitzone des Snowflake-Benutzers, der sich verbindet.

  • Zeitzone: Verwendet eine bestimmte Zeitzone (z. B. America/New_York), falls gültig.

    Weitere Informationen zu den Auswirkungen dieser Option finden Sie unter Arbeiten mit Zeitstempeln und Zeitzonen (unter diesem Thema).

sfCompress

Wenn auf on (Standard) eingestellt, werden die zwischen Snowflake und Spark übermittelten Daten komprimiert.

s3MaxFileSize

Die Größe der Datei, die beim Verschieben von Daten von Snowflake nach Spark verwendet wird. Der Standardwert ist 10MB.

parallelism

Die Größe des Threadpools, der für Uploads und Downloads zwischen Snowflake und Spark verwendet werden soll. Die Voreinstellung ist 4.

Im Allgemeinen muss dieser Wert nicht geändert werden, es sei denn, Sie haben einen bestimmten Bedarf, den Durchsatz zu erhöhen oder zu verringern. Die Parallelität in Spark-Anwendungen wird am besten durch Partitionen und Executoren verwaltet. Darüber hinaus sollte der Grad der Parallelität nicht auf eine beliebig große Anzahl eingestellt werden, um einen hohen Durchsatz zu erzielen. Dies kann negative und unbeabsichtigte Auswirkungen haben, einschließlich potenziell langsamerer Uploads/Downloads.

Beispiel:

df.write
.format(SNOWFLAKE_SOURCE_NAME)
.option("parallelism", "8")
.mode(SaveMode.Overwrite)
.save()
preactions

Eine durch Semikolon getrennte Liste von SQL-Befehlen, die ausgeführt werden, bevor Daten zwischen Spark und Snowflake übertragen werden.

Wenn ein SQL-Befehl %s enthält, wird %s durch den Tabellennamen ersetzt, auf den für die Operation verwiesen wird.

postactions

Eine durch Semikolon getrennte Liste von SQL-Befehlen, die ausgeführt werden, nachdem Daten zwischen Spark und Snowflake übertragen wurden.

Wenn ein SQL-Befehl %s enthält, wird dies durch den Tabellennamen ersetzt, auf den für die Operation verwiesen wird.

truncate_columns

Wenn auf on (Standard) gesetzt, werden beim COPY-Befehl Textzeichenfolgen automatisch abgeschnitten, die die Länge der Zielspalte überschreiten. Wenn auf off gesetzt, erstellt der Befehl beim Überschreiten der Länge der Zielspalte einen Fehler.

truncate_table

Dieser Parameter steuert, ob Snowflake das Schema einer Snowflake-Zieltabelle beim Überschreiben dieser Tabelle beibehalten wird.

Wenn eine Zieltabelle in Snowflake überschrieben wird, wird standardmäßig auch das Schema dieser Zieltabelle überschrieben. Das neue Schema basiert auf dem Schema der Quelltabelle (dem Spark-DataFrame).

Manchmal ist das Schema der Quelle jedoch nicht ideal. Ein Benutzer kann beispielsweise wünschen, dass eine Snowflake-Zieltabelle in Zukunft FLOAT-Werte speichern kann, obwohl der Datentyp der ursprünglichen Quellspalte INTEGER ist. In diesem Fall sollte das Schema der Snowflake-Tabelle nicht überschrieben werden. Die Snowflake-Tabelle sollte lediglich abgeschnitten und dann mit ihrem aktuellen Schema wiederverwendet werden.

Die möglichen Werte dieses Parameters sind:

  • on

  • off

Wenn dieser Parameter on ist, wird das ursprüngliche Schema der Zieltabelle beibehalten. Wenn dieser Parameter off ist, dann wird das alte Schema der Tabelle ignoriert und ein neues Schema basierend auf dem Schema der Quelle erzeugt.

Dieser Parameter ist optional.

Der Standardwert dieses Parameters ist off (d. h. standardmäßig wird das ursprüngliche Tabellenschema überschrieben).

Weitere Details zum Zuordnen von Spark-Datentypen zu Snowflake-Datentypen (und umgekehrt) finden Sie unter Zuordnen von Datentypen (unter diesem Thema).

continue_on_error

Diese Variable steuert, ob der COPY-Befehl abgebrochen wird, wenn der Benutzer ungültige Daten eingibt (z. B. ungültiges JSON-Format für eine Spalte vom Datentyp Variant).

Die möglichen Werte sind:

  • on

  • off

Der Wert on bedeutet, dass bei Auftreten eines Fehlers der Vorgang fortgesetzt wird. Der Wert off bedeutet, dass bei Auftreten eines Fehlers der Vorgang abgebrochen wird.

Dieser Parameter ist optional.

Der Standardwert dieses Parameters ist off.

Es wird nicht empfohlen, diese Option zu aktivieren. Wenn beim Kopieren (COPY) in Snowflake mit dem Spark-Konnektor ein Fehler gemeldet wurde, dann führt dies wahrscheinlich zu fehlenden Daten.

Bemerkung

Wenn Zeilen abgelehnt werden oder fehlen und diese Zeilen in der Eingabequelle nicht eindeutig fehlerhaft sind, wenden Sie sich an Snowflake.

usestagingtable

Dieser Parameter steuert, ob das Laden von Daten über eine Stagingtabelle erfolgt.

Eine Staging-Tabelle ist eine normale Tabelle (mit einem temporären Namen), die vom Konnektor erstellt wird. Wenn die Datenladeoperation erfolgreich ist, wird die ursprüngliche Zieltabelle gelöscht, und die Staging-Tabelle in den Namen der ursprünglichen Zieltabelle umbenannt. Wenn die Datenladeoperation fehlschlägt, wird die Staging-Tabelle gelöscht, und die Zieltabelle wird mit den Daten belassen, die sie unmittelbar vor der Operation hatte. Somit ermöglicht die Staging-Tabelle, dass die ursprünglichen Daten der Zieltabelle beibehalten werden, wenn die Operation fehlschlägt. Aus Sicherheitsgründen empfiehlt Snowflake in den meisten Fällen dringend die Verwendung einer Staging-Tabelle.

Damit der Konnektor eine Staging-Tabelle erstellen kann, muss der Benutzer, der die Kopieroperation (COPY) über den Spark-Konnektor ausführt, über ausreichende Berechtigungen zum Erstellen einer Tabelle verfügen. Das direkte Laden (d. h. Laden ohne Verwendung einer Staging-Tabelle) ist sinnvoll, wenn der Benutzer keine Berechtigung zum Erstellen einer Tabelle hat.

Die möglichen Werte dieses Parameters sind:

  • on

  • off

Wenn der Parameter on ist, wird eine Staging-Tabelle verwendet. Wenn dieser Parameter off ist, dann werden die Daten direkt in die Zieltabelle geladen.

Dieser Parameter ist optional.

Der Standardwert dieses Parameters ist on (d. h. Stagingtabelle wird verwendet).

autopushdown

Dieser Parameter steuert, ob das automatische Abfrage-Pushdown aktiviert ist.

Wenn Pushdown aktiviert ist, wird beim Ausführen einer Abfrage in Spark ein Teil der Abfrageverarbeitung auf den Snowflake-Server „verschoben“. Dies verbessert die Performance einiger Abfragen.

Dieser Parameter ist optional.

Der Standardwert ist on, wenn der Konnektor mit einer kompatiblen Version von Spark verwendet wird. Andernfalls ist der Standardwert off.

Wenn der Konnektor mit einer anderen Version von Spark verwendet wird, als für den Konnektor vorgesehen (z. B. wenn die Version 2.3 des Konnektors mit Version 2.2 von Spark verwendet wird), dann ist der automatische Pushdown deaktiviert, auch wenn dieser Parameter auf on eingestellt ist.

purge

Wenn diese Einstellung auf on gesetzt ist, löscht der Konnektor temporäre Dateien, die beim Übertragen von Spark nach Snowflake per externer Datenübertragung erstellt wurden. Wenn dieser Parameter auf off gesetzt ist, werden diese Dateien nicht automatisch vom Konnektor gelöscht.

Das Bereinigen funktioniert nur bei Datenübertragungen von Spark nach Snowflake, nicht bei Datenübertragungen von Snowflake nach Spark.

Die möglichen Werte sind

  • on

  • off

Der Standardwert ist off.

columnmap

Dieser Parameter ist nützlich, wenn Sie Daten von Spark nach Snowflake schreiben und die Spaltennamen in der Snowflake-Tabelle nicht mit den Spaltennamen in der Spark-Tabelle übereinstimmen. Sie können eine Zuordnung erstellen, die angibt, welche Spark-Quellspalte zu welcher Snowflake-Zielspalte gehört.

Der Parameter ist ein einzelnes Zeichenfolgenliteral in Form von:

"Map(col_2 -> col_b, col_3 -> col_a)"

Betrachten Sie beispielsweise das folgende Szenario:

  • Angenommen, ein DataFrame namens df in Spark hat drei Spalten:

    col_1, col_2, col_3

  • Außerdem hat eine Tabelle mit dem Namen tb in Snowflake zwei Spalten:

    col_a, col_b

  • Sie möchten folgende Werte kopieren:

    • Von df.col_2 nach tb.col_b.

    • Von df.col_3 bis tb.col_a.

Der Wert des columnmap-Parameters wäre:

Map(col_2 -> col_b, col_3 -> col_a)

Sie können diesen Wert generieren, indem Sie den folgenden Scala-Code ausführen:

Map("col_2"->"col_b","col_3"->"col_a").toString()

Der Standardwert dieses Parameters ist null. Mit anderen Worten, standardmäßig sollten die Spaltennamen in der Quell- und Zieltabelle übereinstimmen.

Dieser Parameter wird nur beim Schreiben von Spark nach Snowflake verwendet. Er gilt nicht beim Schreiben von Snowflake nach Spark.

keep_column_case

Beim Schreiben einer Tabelle von Spark nach Snowflake wandelt der Spark-Konnektor die Buchstaben der Spaltennamen standardmäßig in Großbuchstaben um, sofern die Spaltennamen nicht in doppelte Anführungszeichen gesetzt sind.

Beim Schreiben einer Tabelle von Snowflake nach Spark setzt der Spark-Konnektor standardmäßig alle Spaltennamen in Anführungszeichen, die keine Großbuchstaben, Unterstriche und Ziffern enthalten.

Wenn Sie „keep_column_case“ auf on setzen, nimmt der Spark-Konnektor diese Änderungen nicht vor.

Die möglichen Werte sind:

  • on

  • off

Der Standardwert ist off.

column_mapping

Der Konnektor muss Spalten aus dem Spark-DataFrame der Snowflake-Tabelle zuordnen. Dies kann anhand von Spaltennamen (unabhängig von der Reihenfolge) oder anhand der Spaltenreihenfolge erfolgen (d. h. die erste Spalte im DataFrame wird unabhängig vom Spaltennamen der ersten Spalte in der Tabelle zugeordnet).

Standardmäßig erfolgt die Zuordnung anhand der Reihenfolge. Sie können diese Einstellung überschreiben, indem Sie den Parameter auf name setzen, wodurch der Konnektor angewiesen wird, Spalten anhand von Spaltennamen zuzuordnen. (Bei der Namenszuordnung wird Groß-/Kleinschreibung nicht berücksichtigt.)

Die möglichen Werte dieses Parameters sind:

  • order

  • name

Der Standardwert ist order.

column_mismatch_behavior

Dieser Parameter gilt nur, wenn der Parameter column_mapping auf name gesetzt ist.

Wenn die Spaltennamen im Spark-DataFrame und in der Snowflake-Tabelle nicht übereinstimmen, gilt Folgendes:

  • Wenn column_mismatch_behavior auf error gesetzt ist, meldet der Spark-Konnektor einen Fehler.

  • Wenn column_mismatch_behavior ignore ist, ignoriert der Spark-Konnektor den Fehler.

    • Der Treiber verwirft jede Spalte im Spark-DataFrame, die keine entsprechende Spalte in der Snowflake-Tabelle enthält.

    • Der Treiber fügt NULLs in jede Spalte der Snowflake-Tabelle ein, die keine entsprechende Spalte im Spark-DataFrame enthält.

Mögliche Fehler sind:

  • Der Spark-DataFrame kann Spalten enthalten, die bis auf die Groß-/Kleinschreibung identisch sind. Da bei der Zuordnung von Spaltennamen die Groß-/Kleinschreibung nicht berücksichtigt wird, kann die korrekte Zuordnung vom DataFrame zur Tabelle nicht ermittelt werden.

  • Die Snowflake-Tabelle kann Spalten enthalten, die bis auf die Groß-/Kleinschreibung identisch sind. Da bei der Zuordnung von Spaltennamen die Groß-/Kleinschreibung nicht berücksichtigt wird, kann die korrekte Zuordnung vom DataFrame zur Tabelle nicht ermittelt werden.

  • Der Spark-DataFrame und die Snowflake-Tabelle weisen möglicherweise keine gemeinsamen Spaltennamen auf. Theoretisch könnte der Spark-Konnektor NULLs in jede Spalte jeder Zeile einfügen, doch ist dies normalerweise sinnlos, sodass der Konnektor auch dann einen Fehler ausgibt, wenn column_mismatch_behavior auf ignore gesetzt ist.

Die möglichen Werte dieses Parameters sind:

  • error

  • ignore

Der Standardwert ist error.

time_output_format

Mit diesem Parameter kann der Benutzer das Format für die zurückgegebenen TIME-Daten angeben.

Die möglichen Werte dieses Parameters sind die möglichen Werte für Zeitformate, die unter Uhrzeitformate angegeben sind.

Dieser Parameter wirkt sich nur auf die Ausgabe aus, nicht auf die Eingabe.

partition_size_in_mb

Dieser Parameter wird verwendet, wenn das Resultset der Abfrage sehr groß ist und in mehrere DataFrame-Partitionen aufgeteilt werden muss. Dieser Parameter gibt die empfohlene unkomprimierte Größe für jede DataFrame-Partition an. Erhöhen Sie diesen Wert, um die Anzahl der Partitionen zu verringern.

Dieser Wert ist die empfohlene Größe. Die tatsächliche Größe der Partitionen kann kleiner oder größer sein.

Diese Option gilt nur, wenn der Parameter use_copy_unload den Wert FALSE hat.

Dieser Parameter ist optional.

Der Standardwert ist 100 (MB).

use_copy_unload

Wenn der Wert FALSE ist, verwendet Snowflake das Arrow-Datenformat für die Auswahl von Daten mit SELECT. Wenn der Wert TRUE ist, kehrt Snowflake zum alten Verhalten des Befehls COPY UNLOAD zurück, um ausgewählte Daten zu übertragen.

Dieser Parameter ist optional.

Der Standardwert ist FALSE.

Verwenden der Schlüsselpaar-Authentifizierung

Snowflake unterstützt die Verwendung von Schlüsselpaar-Authentifizierung anstelle der typischen Authentifizierung durch Benutzernamen und Kennwort. Dieses Authentifizierungsverfahren erfordert ein 2048-Bit-RSA-Schlüsselpaar (Minimum). Generieren Sie das Public/Private-Schlüsselpaar mit OpenSSL. Der öffentliche Schlüssel wird dem Snowflake-Benutzer zugewiesen, der den Snowflake-Client verwendet.

So konfigurieren Sie das Public/Private-Schlüsselpaar:

  1. Generieren Sie über die Befehlszeile in einem Terminalfenster einen privaten Schlüssel:

    Sie können entweder eine verschlüsselte Version des privaten Schlüssels oder eine unverschlüsselte Version des privaten Schlüssels generieren.

    Um eine unverschlüsselte Version zu generieren, verwenden Sie den folgenden Befehl:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
    

    Um eine verschlüsselte Version zu generieren, verwenden Sie den folgenden Befehl (ohne „-nocrypt“):

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
    

    Es ist normalerweise sicherer, eine verschlüsselte Version zu erstellen.

    Wenn Sie den zweiten Befehl zum Verschlüsseln des privaten Schlüssels verwenden, fordert OpenSSL Sie zur Eingabe einer Passphrase auf, die zum Verschlüsseln der Datei des privaten Schlüssels verwendet wird. Wir empfehlen, zum Schutz des privaten Schlüssels eine starke Passphrase zu verwenden. Notieren Sie sich diese Passphrase an einem sicheren Ort. Sie müssen sie beim Herstellen der Verbindung mit Snowflake eingeben. Beachten Sie, dass die Passphrase nur zum Schutz des privaten Schlüssels verwendet wird und niemals an Snowflake gesendet wird.

    Beispiel für einen privaten PEM-Schlüssel

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. Generieren Sie über die Befehlszeile den öffentlichen Schlüssel, indem Sie auf den privaten Schlüssel verweisen:

    Angenommen, der private Schlüssel befindet sich in der Datei „rsa_key.p8“, verwenden Sie den folgenden Befehl:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    Beispiel für einen öffentlichen PEM-Schlüssel

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. Kopieren Sie die Dateien der öffentlichen und privaten Schlüssel zur Speicherung in einem lokalen Verzeichnis. Zeichnen Sie den Pfad zu den Dateien auf. Beachten Sie, dass der private Schlüssel im Format PKCS#8 (Public Key Cryptography Standards) gespeichert und mit der im vorherigen Schritt angegebenen Passphrase verschlüsselt wird; die Datei sollte jedoch weiterhin mit dem von Ihrem Betriebssystem bereitgestellten Dateiberechtigungsmechanismus vor unbefugtem Zugriff geschützt sein. Es liegt in Ihrer Verantwortung, die Datei zu sichern, wenn sie nicht verwendet wird.

  4. Weisen Sie dem Snowflake-Benutzer den öffentlichen Schlüssel mit ALTER USER zu. Beispiel:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    Bemerkung

    • Nur Sicherheitsadministratoren (d. h. Benutzer mit der Rolle SECURITYADMIN oder höher) können andere Benutzer ändern.

    • Schließen Sie die Kopf- und Fußzeile des öffentlichen Schlüssels in der SQL-Anweisung aus.

    Überprüfen Sie den Fingerabdruck des öffentlichen Schlüssels des Benutzers mithilfe von DESCRIBE USER:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    Bemerkung

    Die RSA_PUBLIC_KEY_2_FP-Eigenschaft wird unter Schlüsselrotation (unter diesem Thema) beschrieben.

  5. Senden Sie eine unverschlüsselte Kopie des privaten Schlüssels mithilfe der Verbindungsoption pem_private_key.

Achtung

Aus Sicherheitsgründen sollten Sie diesen pem_private_key-Schlüssel in Ihrer Anwendung nicht fest codieren, sondern den Parameter dynamisch einstellen, nachdem Sie den Schlüssel aus einer sicheren Quelle gelesen haben. Wenn der Schlüssel verschlüsselt ist, entschlüsseln Sie ihn, und senden Sie die entschlüsselte Version.

Beachten Sie im Python-Beispiel, dass die pem_private_key-Datei rsa_key.p8:

  • direkt aus einer kennwortgeschützten Datei mit der Umgebungsvariablen PRIVATE_KEY_PASSPHRASE gelesen wird und

  • den Ausdruck pkb in der sfOptions-Zeichenfolge verwendet.

Um eine Verbindung herzustellen, können Sie das Python-Beispiel in einer Datei speichern (d. h. <file.py>) und dann den folgenden Befehl ausführen:

spark-submit --packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4 <file.py>

Python

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os

with open("<path>/rsa_key.p8", "rb") as key_file:
  p_key = serialization.load_pem_private_key(
    key_file.read(),
    password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
    backend = default_backend()
    )

pkb = p_key.private_bytes(
  encoding = serialization.Encoding.PEM,
  format = serialization.PrivateFormat.PKCS8,
  encryption_algorithm = serialization.NoEncryption()
  )

pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')

sfOptions = {
  "sfURL" : "<account_name>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "pem_private_key" : pkb,
  "sfDatabase" : "<database>",
  "sfSchema" : "schema",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "COLORS") \
    .load()

df.show()

Schlüsselrotation

Snowflake unterstützt mehrere aktive Schlüssel, um eine ununterbrochene Rotation zu ermöglichen. Rotieren und ersetzen Sie Ihre öffentlichen und privaten Schlüssel basierend auf dem Ablaufzeitplan, den Sie intern einhalten.

Derzeit können Sie die Parameter RSA_PUBLIC_KEY und RSA_PUBLIC_KEY_2 für ALTER USER verwenden, um einem einzelnen Benutzer bis zu zwei öffentliche Schlüssel zuzuordnen.

So rotieren Sie Ihre Schlüssel:

  1. Führen Sie die unter Verwenden der Schlüsselpaar-Authentifizierung angegebenen Schritte aus, um Folgendes zu ermöglichen:

    • Generieren eines neuen privaten und öffentlichen Schlüsselsatzes

    • Zuweisen des öffentlichen Schlüssels an einen Benutzer Setzen Sie den Wert des öffentlichen Schlüssels entweder auf RSA_PUBLIC_KEY oder RSA_PUBLIC_KEY_2 (je nachdem, welcher Schlüsselwert gerade nicht verwendet wird). Beispiel:

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Aktualisieren Sie den Code, um eine Verbindung zu Snowflake herzustellen. Geben Sie den neuen privaten Schlüssel an.

    Snowflake überprüft den korrekten aktiven öffentlichen Schlüssel für die Authentifizierung basierend auf dem privaten Schlüssel, der mit Ihren Verbindungsinformationen übermittelt wurde.

  3. Entfernen Sie den alten öffentlichen Schlüssel aus dem Benutzerprofil. Beispiel:

    alter user jsmith unset rsa_public_key;
    

Verwenden von External OAuth

Ab der Spark-Konnektorversion 2.7.0 können Sie sich mit External OAuth entweder mit dem Scala-Beispielprogramm oder dem Python-Beispielskript bei Snowflake authentifizieren.

Bevor Sie External OAuth und den Spark-Konnektor zur Authentifizierung bei Snowflake verwenden, konfigurieren Sie eine External OAuth-Sicherheitsintegration für einen der unterstützten External OAuth-Autorisierungsserver oder einen benutzerdefinierten Client für External OAuth.

Beachten Sie in den Scala- und Python-Beispielen, dass der Parameter sfPassword durch die Parameter sfAuthenticator und sfToken ersetzt wird.

Scala:

// spark connector version

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.DataFrame

var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<username>",
    "sfAuthenticator" -> "oauth",
    "sfToken" -> "<external_oauth_access_token>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "region")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Python:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<YOUR_APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<YOUR_AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<YOUR_AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_name>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfAuthenticator" : "oauth",
  "sfToken" : "<external_oauth_access_token>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

AWS-Optionen für externe Datenübertragung

Diese Optionen werden verwendet, um den Amazon S3-Speicherort für die Speicherung temporärer Daten anzugeben und um Authentifizierungsdetails für den Zugriff auf den Speicherort bereitzustellen. Sie sind nur erforderlich, wenn Sie eine externe Datenübertragung durchführen. Externe Datenübertragungen sind erforderlich, wenn eine der folgenden Voraussetzungen erfüllt ist:

  • Sie verwenden die Version 2.1.x oder niedriger des Spark-Konnektors (der keine interne Übertragung unterstützt) oder

  • Ihre Übertragung dauert wahrscheinlich 36 Stunden oder länger (interne Übertragung verwendet temporäre Anmeldeinformationen, die nach 36 Stunden ablaufen).

tempDir

Der S3-Speicherort, an dem Daten zwischengespeichert werden (z. B. s3n://xy12345-bucket/spark-snowflake-tmp/).

Wenn tempDir angegeben ist, müssen Sie auch eine der beiden Optionen angeben:

  • awsAccessKey, awsSecretKey . oder

  • temporary_aws_access_key_id, temporary_aws_secret_access_key, temporary_aws_session_token

awsAccessKey, awsSecretKey

Dies sind standardmäßige AWS-Anmeldeinformationen, die den Zugriff auf den in tempDir angegebenen Speicherort erlauben. Beachten Sie, dass diese beiden Optionen zusammen festgelegt werden müssen.

Wenn beide festgelegt sind, können sie aus dem vorhandenen SparkContext-Objekt abgerufen werden.

Wenn Sie diese Variablen angeben, müssen Sie auch tempDir angeben.

Diese Anmeldeinformationen sollten auch für den Hadoop-Cluster festgelegt werden.

temporary_aws_access_key_id, temporary_aws_secret_access_key, temporary_aws_session_token

Dies sind temporäre AWS-Anmeldeinformationen, die den Zugriff auf den in tempDir angegebenen Speicherort ermöglichen. Beachten Sie, dass alle drei Optionen zusammen festgelegt werden müssen.

Wenn diese Optionen festgelegt sind, haben sie Vorrang vor den Optionen awsAccessKey und awsSecretKey.

Wenn Sie temporary_aws_access_key_id, temporary_aws_secret_access_key und temporary_aws_session_token angeben, müssen Sie auch tempDir angeben. Andernfalls werden diese Parameter ignoriert.

check_bucket_configuration

Wenn auf on (Standard) gesetzt, prüft der Konnektor, ob für den Bucket, der zur Datenübertragung verwendet wird, eine Lebenszyklusrichtlinie konfiguriert ist (siehe Vorbereiten eines externen AWS S3-Buckets für weitere Informationen). Wenn keine Lebenszyklusrichtlinie vorhanden ist, wird eine Warnung protokolliert.

Wenn Sie diese Option deaktivieren (durch Setzen auf off), wird diese Prüfung übersprungen. Dies kann hilfreich sein, wenn ein Benutzer auf die Bucketdatenoperationen zugreifen darf, nicht aber auf die Lebenszyklusrichtlinien des Buckets. Das Deaktivieren der Option kann auch die Ausführungszeiten von Abfragen etwas beschleunigen.

Weitere Details dazu finden Sie unter Authentifizierung von S3 für den Datenaustausch (unter diesem Thema).

Azure-Optionen für externe Datenübertragung

In diesem Abschnitt werden die Parameter beschrieben, die für Azure Blob-Speicher bei externen Datenübertragungen gelten. Externe Datenübertragungen sind erforderlich, wenn eine der folgenden Voraussetzungen erfüllt ist:

  • Sie verwenden die Version 2.1.x oder niedriger des Spark-Konnektors (der keine interne Übertragung unterstützt) oder

  • Ihre Übertragung dauert wahrscheinlich 36 Stunden oder länger (interne Übertragung verwendet temporäre Anmeldeinformationen, die nach 36 Stunden ablaufen).

Wenn Sie eine externe Übertragung mit Azure Blob-Speicher verwenden, geben Sie den Speicherort des Azure-Containers und die SAS (Freigabesignatur) für diesen Container mit den unten beschriebenen Parametern an.

tempDir

Der Azure Blob-Speichercontainer, in dem Daten zwischengespeichert werden. Dies hat die Form einer URL, zum Beispiel:

wasb://<Azure-Container>@<Azure-Konto>.<Azure-Endpunkt>/

temporary_azure_sas_token

Geben Sie das SAS-Token für den Azure Blob-Speicher an.

Weitere Details dazu finden Sie unter Authentifizieren von Azure für den Datenaustausch (unter diesem Thema).

Angeben von Azure-Informationen für temporären Speicher in Spark

Wenn Sie mit Azure Blob Storage einen temporären Speicher zum Übertragen von Daten zwischen Spark und Snowflake bereitstellen, müssen Sie neben dem Snowflake-Konnektor für Spark den Speicherort und die Anmeldeinformationen für den temporären Speicher angeben.

Um Spark die Angaben zum temporären Speicherort bereitzustellen, führen Sie in Ihrem Spark-Cluster beispielsweise folgende Befehle aus:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

Beachten Sie, dass der letzte Befehl die folgenden Variablen enthält:

  • <Container> und <Konto>: Dabei handelt es sich um den Container- und Kontoname für Ihre Azure-Bereitstellung.

  • <Azure-Endpunkt>: Dies ist der Endpunkt Ihres Azure-Bereitstellungsorts. Wenn Sie beispielsweise eine Azure US-Bereitstellung verwenden, ist der Endpunkt wahrscheinlich blob.core.windows.net.

  • <Azure-SAS>: Dies ist das Shared Access Signature-Sicherheitstoken.

Ersetzen Sie jede dieser Variablen durch die passenden Informationen für Ihr Azure Blob Storage-Konto.

Übergeben von Snowflake-Sitzungsparametern als Optionen für den Konnektor

Der Snowflake-Konnektor für Spark unterstützt das Senden beliebiger Parameter auf Sitzungsebene an Snowflake (siehe Sitzungsparameter für weitere Informationen). Dies kann durch Hinzufügen eines ("<Schlüssel>" -> "<Wert>")-Paares zum options-Objekt erreicht werden, wobei <Schlüssel> der Sitzungsparametername und <Wert> der Wert ist.

Bemerkung

Der <Wert> sollte eine Zeichenfolge sein, die in doppelte Anführungszeichen eingeschlossen ist, auch für Parameter, die Zahlen oder boolesche Werte akzeptieren (z. B. "1" oder "true").

Im folgenden Codebeispiel wird der Sitzungsparameter USE_CACHED_RESULT mit dem Wert "false" übergeben, wodurch die Verwendung der Ergebnisse zuvor ausgeführter Abfragen deaktiviert wird:

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method

Sicherheitshinweise

Kunden sollten sicherstellen, dass in einem Spark-System mit mehreren Knoten die Kommunikation zwischen den Knoten sicher ist. Der Spark-Master sendet Snowflake-Anmeldeinformationen an Spark-Mitarbeiter, damit diese auf Snowflake-Stagingbereiche zugreifen können. Wenn die Kommunikation zwischen dem Spark-Master und den Spark-Mitarbeitern nicht sicher ist, können die Anmeldeinformationen von einem nicht autorisierten Dritten gelesen werden.

Authentifizierung von S3 für den Datenaustausch

In diesem Abschnitt wird die Authentifizierung bei der Verwendung von S3 für den Datenaustausch beschrieben.

Die Ausführung dieser Aufgabe ist nur in einer der folgenden Situationen erforderlich:

  • Die Version des Snowflake-Konnektors für Spark ist 2.1.x (oder niedriger). Ab Version 2.2.0 verwendet der Konnektor für den Datenaustausch einen internen temporären Stagingbereich von Snowflake. Wenn Sie derzeit nicht die Version 2.2.0 (oder höher) des Konnektors verwenden, empfiehlt Snowflake dringend ein Upgrade auf die neueste Version.

  • Die Version des Snowflake-Konnektors für Spark ist 2.2.0 (oder höher), aber Ihre Jobs dauern regelmäßig länger als 36 Stunden. Dies ist die maximale Dauer für das AWS-Token, das der Konnektor verwendet, um auf den internen Stagingbereich für den Datenaustausch zuzugreifen.

Wenn Sie eine ältere Version des Konnektors verwenden, müssen Sie einen S3-Speicherort vorbereiten, den der Konnektor zum Datenaustausch zwischen Snowflake und Spark verwenden kann.

Um den Zugriff auf den S3-Bucket/das Verzeichnis für den Datenaustausch zwischen Spark und Snowflake zu ermöglichen (wie für tempDir angegeben), werden zwei Authentifizierungsmethoden unterstützt:

  • Permanente AWS-Anmeldeinformationen (auch zur Konfiguration der Hadoop/Spark-Authentifizierung für den Zugriff auf S3 verwendet)

  • Temporäre AWS-Anmeldeinformationen

Verwenden von permanenten AWS-Anmeldeinformationen

Dies ist die Standard-Authentifizierungsmethode für AWS. Sie erfordert ein Paar aus awsAccessKey- und awsSecretKey-Werten.

Bemerkung

Diese Werte sollten auch verwendet werden, um Hadoop/Spark für den Zugriff auf S3 zu konfigurieren. Weitere Informationen dazu, einschließlich Beispiele, finden Sie unter Authentifizieren von Hadoop/Spark mit S3A oder S3N (unter diesem Thema).

Beispiel:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)

Details zu den von sfOptions unterstützten Optionen finden Sie unter AWS-Optionen für externe Datenübertragung (unter diesem Thema).

Authentifizieren von Hadoop/Spark mit S3A oder S3N

Hadoop/Spark-Ökosysteme unterstützen 2 URI-Schemas für den Zugriff auf S3:

s3a://

Neue, empfohlene Methode (für Hadoop 2.7 und höher)

Um diese Methode zu verwenden, ändern Sie die Scala-Beispiele unter diesem Thema, und fügen Sie dann die folgenden Hadoop-Konfigurationsoptionen hinzu:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)

Stellen Sie sicher, dass die Option tempdir ebenfalls s3a:// verwendet.

s3n://

Ältere Methode (für Hadoop 2.6 und niedriger)

In einigen Systemen ist es notwendig, sie explizit anzugeben, wie im folgenden Scala-Beispiel gezeigt:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)

Verwenden von temporären AWS-Anmeldeinformationen

Bei dieser Methode werden die Konfigurationsoptionen temporary_aws_access_key_id temporary_aws_secret_access_key und temporary_aws_session_token für den Konnektor verwendet.

Diese Methode bietet zusätzliche Sicherheit, da Snowflake nur temporären Zugriff auf den S3-Bucket bzw. das Verzeichnis für den Datenaustausch erhält.

Bemerkung

Temporäre Anmeldeinformationen können nur zur Konfiguration der S3-Authentifizierung für den Konnektor verwendet werden. Sie können nicht zur Konfiguration der Hadoop/Spark-Authentifizierung verwendet werden.

Wenn Sie temporäre Anmeldeinformationen angeben, haben diese Vorrang vor allen anderen permanenten Anmeldeinformationen.

Der folgende Scala-Code zeigt ein Beispiel für das Authentifizieren mit temporären Anmeldeinformationen:

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())

sfOptions2 kann nun mit der options()-DataFrame -Methode verwendet werden.

Authentifizieren von Azure für den Datenaustausch

In diesem Abschnitt wird beschrieben, wie Sie sich authentifizieren, wenn Sie den Azure Blob-Speicher für den Datenaustausch verwenden.

Die Authentifizierung auf diese Weise ist nur erforderlich, wenn einer der folgenden Umstände gilt:

  • Die Version des Snowflake-Konnektors für Spark ist 2.1.x (oder niedriger). Ab Version 2.2.0 verwendet der Konnektor für den Datenaustausch einen internen temporären Stagingbereich von Snowflake. Wenn Sie derzeit nicht die Version 2.2.0 (oder höher) des Konnektors verwenden, empfiehlt Snowflake dringend ein Upgrade auf die neueste Version.

  • Die Version des Snowflake-Konnektors für Spark ist 2.2.0 (oder höher), aber Ihre Jobs dauern regelmäßig länger als 36 Stunden. Dies ist die maximale Dauer für das Azure-Token, das vom Konnektor verwendet wird, um für den Datenaustausch auf den internen Stagingbereich zuzugreifen.

Sie müssen einen externen Azure Blob-Speichercontainer vorbereiten, über den der Konnektor Daten zwischen Snowflake und Spark austauschen kann.

Verwenden von Azure-Anmeldeinformationen

Dies ist die Standard-Authentifizierungsmethode für Azure Blob-Speicher. Sie erfordert ein Wertepaar aus tempDir-Wert (eine URL) und temporary_azure_sas_token-Wert.

Bemerkung

Diese Werte sollten auch verwendet werden, um Hadoop/Spark für den Zugriff auf Azure Blob-Speicher zu konfigurieren. Weitere Informationen dazu, einschließlich Beispiele, finden Sie unter Authentifizieren von Hadoop/Spark mit Azure (unter diesem Thema).

Beispiel:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

// Then, configure your Snowflake environment
//
val sfOptions = Map(
  "sfURL" -> "<account_name>.snowflakecomputing.com",
  "sfUser" -> "<user_name>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database_name>",
  "sfSchema" -> "<schema_name>",
  "sfWarehouse" -> "<warehouse_name>",
  "sfCompress" -> "on",
  "sfSSL" -> "on",
  "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/",
  "temporary_azure_sas_token" -> "<azure_sas>"
)

Details zu den von sfOptions unterstützten Optionen finden Sie unter Azure-Optionen für externe Datenübertragung (unter diesem Thema).

Authentifizieren von Hadoop/Spark mit Azure

Um diese Methode zu verwenden, ändern Sie die Scala-Beispiele unter diesem Thema, und fügen Sie dann die folgenden Hadoop-Konfigurationsoptionen hinzu:

val hadoopConf = sc.hadoopConfiguration
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

Stellen Sie sicher, dass die Option tempdir ebenfalls wasb:// verwendet.

Authentifizieren über einen Browser wird nicht unterstützt

Bei Verwendung des Spark-Konnektors ist es nicht praktisch, eine Form von Authentifizierung zu verwenden, bei der ein Browserfenster geöffnet wird, in dem der Benutzer zur Eingabe von Anmeldeinformationen aufgefordert wird. Dieses Fenster würde auf dem Clientcomputer nicht unbedingt angezeigt. Daher unterstützt der Spark-Konnektor keine Art von Authentifizierung, einschließlich MFA (Multi-Factor Authentication) oder SSO (Single Sign-On), bei der ein Fenster im Browser geöffnet werden würde.