Verwenden des Spark-Konnektors¶
Der Konnektor entspricht der Standard-API von Spark, bietet jedoch zusätzliche Snowflake-spezifische Optionen, die unter diesem Thema beschrieben werden.
COPY bezieht sich unter diesem Thema auf:
COPY INTO <Tabelle> (zur Übertragung von Daten aus einem internen oder externen Stagingbereich in eine Tabelle).
COPY INTO <Speicherort> (zur Übertragung von Daten aus einer Tabelle in einen internen oder externen Stagingbereich).
Unter diesem Thema:
Überprüfen der Netzwerkverbindung zu Snowflake mit SnowCD¶
Nach der Konfiguration des Treibers können Sie die Netzwerkkonnektivität zu Snowflake mit SnowCD testen und Probleme beheben.
Sie können während der Erstkonfiguration und bei Bedarf jederzeit SnowCD verwenden, um Ihre Netzwerkverbindung zu Snowflake zu testen und Probleme zu beheben.
Pushdown¶
Der Spark-Konnektor wendet Prädikat- und Abfrage-Pushdown an, indem die logischen Pläne von Spark für SQL-Operationen erfasst und analysiert werden. Wenn die Datenquelle Snowflake ist, werden die Operationen in eine SQL-Abfrage übersetzt und dann in Snowflake ausgeführt, um die Leistung zu verbessern.
Da für diese Übersetzung jedoch eine nahezu Eins-zu-Eins-Übersetzung der Spark-SQL-Operatoren in Snowflake-Ausdrücke erforderlich ist, können nicht alle Spark-SQL-Operatoren für Pushdown verwendet werden. Wenn Pushdown fehlschlägt, greift der Konnektor auf einen weniger optimierten Ausführungsplan zurück. Die nicht unterstützten Operationen werden stattdessen in Spark ausgeführt.
Bemerkung
Wenn Sie Pushdown für alle Operationen benötigen, sollten Sie Ihren Code so schreiben, dass dieser stattdessen Snowpark-API verwendet.
Nachfolgend finden Sie eine Liste der unterstützten Pushdown-Operationen (für alle unten aufgeführten Funktionen werden deren Spark-Namen verwendet). Wenn eine Funktion nicht in dieser Liste enthalten ist, wird möglicherweise ein Spark-Plan, der diese Funktion verwendet, auf Spark ausgeführt, anstatt den Pushdown zu Snowflake durchzuführen.
Aggregatfunktionen
Average
Corr
CovPopulation
CovSample
Count
Max
Min
StddevPop
StddevSamp
Sum
VariancePop
VarianceSamp
Boolesche Operatoren
And
Between
Contains
EndsWith
EqualTo
GreaterThan
GreaterThanOrEqual
In
IsNull
IsNotNull
LessThan
LessThanOrEqual
Not
Or
StartsWith
Funktionen für Datum, Uhrzeit und Zeitstempel
DateAdd
DateSub
Month
Quarter
TruncDate
TruncTimestamp
Year
Mathematische Funktionen
Arithmetische Operatoren „+“ (Addition), „-“ (Subtraktion), „*“ (Multiplikation), „/“ (Division) und „-“ (unäre Negation).
Abs
Acos
Asin
Atan
Ceil
CheckOverflow
Cos
Cosh
Exp
Floor
Greatest
Least
Log
Pi
Pow
PromotePrecision
Rand
Round
Sin
Sinh
Sqrt
Tan
Tanh
Verschiedene Operatoren
Alias (AS-Ausdrücke)
BitwiseAnd
BitwiseNot
BitwiseOr
BitwiseXor
CaseWhen
Cast(Unterelement, t, _)
Coalesce
If
MakeDecimal
ScalarSubquery
ShiftLeft
ShiftRight
SortOrder
UnscaledValue
Relationale Operatoren
Aggregatfunktionen und Group-By-Klauseln
Distinct
Filter
In
InSet
Joins
Einschränkungen
Projektionen
Sortierungen (ORDER BY)
Union und Union All
Fensterfunktionen und Fensterklauseln
Zeichenfolgenfunktionen
Ascii
Concat(Unterelemente)
Length
Like
Lower
StringLPad
StringRPad
StringTranslate
StringTrim
StringTrimLeft
StringTrimRight
Teilzeichenfolge
Upper
Fensterfunktionen (Hinweis: Diese funktionieren nicht mit Spark 2.2)
DenseRank
Rank
RowNumber
Verwenden des Konnektors in Scala¶
Angeben des Namens der Datenquellenklasse¶
Damit Snowflake als Datenquelle in Spark verwendet werden kann, geben Sie mit der Option .format
den Namen der Snowflake-Konnektorklasse an, die die Datenquelle definiert.
net.snowflake.spark.snowflake
Um beim Kompilieren eine Prüfung des Klassennamens sicherzustellen, empfiehlt Snowflake dringend, eine Variable für den Klassennamen zu definieren. Beispiel:
val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
Außerdem stellt die Klasse Utils
die Variable zur Verfügung, die wie folgt importiert werden kann:
import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
Bemerkung
Alle Beispiele unter diesem Thema verwenden SNOWFLAKE_SOURCE_NAME
als Klassendefinition.
Aktivieren/Deaktivieren von Pushdown in einer Sitzung¶
Version 2.1.0 (und höher) des Konnektors unterstützt die Abfrageverschiebung (Pushdown), wodurch die Performance deutlich gesteigert wird. Dabei wird die Verarbeitung von Abfragen an Snowflake übergeben, wenn Snowflake die Spark-Datenquelle ist.
Standardmäßig ist Pushdown aktiviert.
Zur Deaktivierung des Pushdowns innerhalb einer Spark-Sitzung für einen bestimmten DataFrame:
Nach der Instanziierung eines
SparkSession
-Objekts rufen Sie die statische MethodeSnowflakeConnectorUtils.disablePushdownSession
auf und übergeben dasSparkSession
Objekt. Beispiel:SnowflakeConnectorUtils.disablePushdownSession(spark)
Dabei ist
spark
IhrSparkSession
-Objekt.Erstellen Sie einen DataFrame mit der Option autopushdown, die auf
off
gesetzt ist. Beispiel:val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", query) .option("autopushdown", "off") .load()
Beachten Sie, dass Sie die Option
autopushdown
auch in einerMap
festlegen können, die Sie dann an dieoptions
-Methode übergeben (z. B. insfOptions
im obigen Beispiel).
Um Pushdown nach der Deaktivierung wieder zu aktivieren, rufen Sie die statische Methode SnowflakeConnectorUtils.enablePushdownSession
auf (mit Übergabe des Objekts SparkSession
) und erstellen ein DataFrame mit aktiviertem autopushdown
.
Verschieben von Daten von Snowflake zu Spark¶
Bemerkung
Wenn Sie DataFrames verwenden, unterstützt der Snowflake-Konnektor nur SELECT-Abfragen.
So lesen Sie Daten von Snowflake in einen Spark-DataFrame ein:
Verwenden Sie die
read()
-Methode desSqlContext
-Objekts, um einenDataFrameReader
zu erstellen.Geben Sie
SNOWFLAKE_SOURCE_NAME
mit der Methodeformat()
an. Weitere Informationen zur Definition finden Sie unter Angeben des Namens der Datenquellenklasse (unter diesem Thema).Geben Sie die Konnektoroptionen mit der Methode
option()
oderoptions()
an. Weitere Informationen dazu finden Sie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema).Geben Sie eine der folgenden Optionen für die zu lesenden Tabellendaten an:
dbtable
: Der Name der zu lesenden Tabelle. Alle Spalten und Datensätze werden abgerufen (d. h. es ist gleichbedeutend mitSELECT * FROM db_table
).query
: Die genaue Abfrage (SELECT-Anweisung), die ausgeführt werden soll.
Nutzungshinweise¶
Derzeit unterstützt der Konnektor bei Verwendung von DataFrames keine anderen Typen von Abfragen (wie SHOW, DESC oder DML-Anweisungen).
Es gibt eine Obergrenze für die Größe einer einzelnen Zeile. Weitere Details dazu finden Sie unter Begrenzung der Abfragetextgröße.
Hinweise zur Performance¶
Wenn Sie Daten zwischen Snowflake und Spark übertragen, verwenden Sie die folgenden Methoden, um die Performance zu analysieren bzw. zu verbessern:
Verwenden Sie die
net.snowflake.spark.snowflake.Utils.getLastSelect()
-Methode, um die aktuelle Abfrage anzuzeigen, die beim Verschieben von Daten von Snowflake nach Spark ausgegeben wird.Wenn Sie die Funktionalität
filter
oderwhere
des Spark-DataFrame verwenden, überprüfen Sie, ob in der ausgeführten SQL-Abfrage die entsprechenden Filter vorhanden sind. Der Snowflake-Konnektor versucht, alle von Spark angeforderten Filter in SQL zu übersetzen.Es gibt jedoch Formen von Filtern, die die Spark-Infrastruktur derzeit nicht an den Snowflake-Konnektor übergibt. Infolgedessen wird in einigen Situationen eine große Anzahl unnötiger Datensätze von Snowflake angefordert.
Wenn Sie nur eine Teilmenge von Spalten benötigen, stellen Sie sicher, dass die Teilmenge in der SQL-Abfrage angegeben ist.
Im Allgemeinen, wenn die ausgegebene SQL-Abfrage nicht mit dem übereinstimmt, was Sie aufgrund der DataFrame-Operationen erwarten, verwenden Sie die Option
query
, um genau die gewünschte SQL-Syntax bereitzustellen.
Beispiele¶
Lesen einer vollständigen Tabelle:
val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t1") .load()
Lesen der Ergebnisse einer Abfrage:
val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1") .load()
Verschieben von Daten von Spark in Snowflake¶
Die Schritte zum Speichern des Inhalts eines DataFrame in eine Snowflake-Tabelle sind ähnlich dem Schreiben von Snowflake in Spark:
Verwenden Sie die
write()
-Methode desDataFrame
, um einenDataFrameWriter
zu erstellen.Geben Sie
SNOWFLAKE_SOURCE_NAME
mit der Methodeformat()
an. Weitere Informationen zur Definition finden Sie unter Angeben des Namens der Datenquellenklasse (unter diesem Thema).Geben Sie die Konnektoroptionen mit der Methode
option()
oderoptions()
an. Weitere Informationen dazu finden Sie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema).Verwenden Sie die Option
dbtable
, um die Tabelle anzugeben, in die die Daten geschrieben werden sollen.Verwenden Sie die Methode
mode()
, um den Speichermodus für den Inhalt festzulegen.Weitere Informationen dazu finden Sie unter SaveMode (Spark-Dokumentation).
Beispiele¶
df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t2") .mode(SaveMode.Overwrite) .save()
Exportieren von JSON-Daten von Spark in Snowflake¶
Spark-DataFrames können JSON-Objekte enthalten, die als Zeichenfolgen serialisiert sind. Der folgende Code zeigt ein Beispiel für die Konvertierung eines regulären DataFrame in einen DataFrame, der JSON-Daten enthält:
val rdd = myDataFrame.toJSON val schema = new StructType(Array(StructField("JSON", StringType))) val jsonDataFrame = sqlContext.createDataFrame( rdd.map(s => Row(s)), schema)
Beachten Sie, dass der resultierende jsonDataFrame
eine einzelne Spalte vom Typ StringType
ist. Wenn dieser DataFrame mit dem üblichen SaveMode.Overwrite
-Modus in Snowflake exportiert wird, wird eine neue Tabelle in Snowflake mit einer einzelnen Spalte vom Typ VARCHAR
erstellt.
So laden Sie einen jsonDataFrame
in eine VARIANT
-Spalte:
Erstellen Sie eine Snowflake-Tabelle (Verbinden mit Snowflake in Java mithilfe des Snowflake-JDBC-Treibers). Erläuterungen zu den im Beispiel verwendeten Verbindungsparametern finden Sie unter Übersicht der Verbindungsparameter für den JDBC-Treiber.
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; public class SnowflakeJDBCExample { public static void main(String[] args) throws Exception { String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.com/"; Properties properties = new Properties(); properties.put("user", "peter"); properties.put("password", "test"); properties.put("account", "myorganization-myaccount"); properties.put("warehouse", "mywh"); properties.put("db", "mydb"); properties.put("schema", "public"); // get connection System.out.println("Create JDBC connection"); Connection connection = DriverManager.getConnection(jdbcUrl, properties); System.out.println("Done creating JDBC connection\n"); // create statement System.out.println("Create JDBC statement"); Statement statement = connection.createStatement(); System.out.println("Done creating JDBC statement\n"); // create a table System.out.println("Create my_variant_table table"); statement.executeUpdate("create or replace table my_variant_table(json VARIANT)"); statement.close(); System.out.println("Done creating demo table\n"); connection.close(); System.out.println("Close connection\n"); } }
Um die vorhandene Tabelle wiederzuverwenden, verwenden Sie
SaveMode.Append
anstelle vonSaveMode.Overwrite
. Wenn der Zeichenfolgenwert für JSON in Snowflake geladen wird, weil die Zielspalte vom Typ VARIANT ist, wird er als JSON analysiert. Beispiel:df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "my_variant_table") .mode(SaveMode.Append) .save()
Ausführen von DDL/DML-SQL-Anweisungen¶
Verwenden Sie die runQuery()
-Methode des Utils
-Objekts, um zusätzlich zu Abfragen DDL/DML-SQL-Anweisungen auszuführen, z. B.:
var sfOptions = Map(
"sfURL" -> "<account_identifier>.snowflakecomputing.com",
"sfUser" -> "<user_name>",
"sfPassword" -> "<password>",
"sfDatabase" -> "<database>",
"sfSchema" -> "<schema>",
"sfWarehouse" -> "<warehouse>"
)
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")
wobei sfOptions
die Parameterzuordnung ist, die zum Lesen/Schreiben von DataFrames verwendet wird.
Die Methode runQuery
gibt nur TRUE oder FALSE zurück. Sie ist für Anweisungen gedacht, die kein Resultset zurückgeben, z. B. DDL-Anweisungen wie CREATE TABLE
und DML-Anweisungen wie INSERT
, UPDATE
und DELETE
. Das ist hilft nicht bei Anweisungen, die ein Resultset zurückgeben, z. B. SELECT
oder SHOW
.
Arbeiten mit Zeitstempeln und Zeitzonen¶
Spark bietet nur einen Typ von Zeitstempel, der dem Scala/Java-Zeitstempeltyp entspricht. Dieser ist im Verhalten nahezu identisch mit dem Datentyp TIMESTAMP_LTZ (lokale Zeitzone) von Snowflake. Daher empfiehlt Snowflake beim Übertragen von Daten zwischen Spark und Snowflake die folgenden Ansätze, um die Zeit im Verhältnis zu den Zeitzonen korrekt zu erhalten:
Verwenden Sie in Snowflake nur den Datentyp TIMESTAMP_LTZ.
Bemerkung
Der standardmäßig zugeordnete Datentyp für den Zeitstempel ist TIMESTAMP_NTZ (keine Zeitzone). Daher müssen Sie ausdrücklich den Parameter TIMESTAMP_TYPE_MAPPING festlegen, um TIMESTAMP_LTZ verwenden zu können.
Stellen Sie die Spark-Zeitzone auf
UTC
, und verwenden Sie diese Zeitzone in Snowflake (d. h. setzen Sie nicht die OptionsfTimezone
für den Konnektor, und setzen Sie nicht explizit eine Zeitzone in Snowflake). In diesem Szenario sind TIMESTAMP_LTZ und TIMESTAMP_NTZ praktisch gleichwertig.Zum Einstellen der Zeitzone fügen Sie Ihrem Spark-Code die folgende Zeile hinzu:
java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
Wenn Sie keinen dieser Ansätze implementieren, können unerwünschte Zeitänderungen auftreten. Betrachten Sie beispielsweise das folgende Szenario:
Die Zeitzone in Spark ist auf
America/New_York
eingestellt.Die Zeitzone in Snowflake ist mit einer der beiden folgenden Methoden auf
Europe/Warsaw
eingestellt:Setzen Sie
sfTimezone
für den Konnektor aufEurope/Warsaw
.Setzen Sie
sfTimezone
für den Konnektor aufsnowflake
und den Sitzungsparameter TIMEZONE in Snowflake aufEurope/Warsaw
.
Sowohl TIMESTAMP_NTZ als auch TIMESTAMP_LTZ werden in Snowflake verwendet.
In diesem Szenario gilt:
Wenn ein Wert, der
12:00:00
in einer TIMESTAMP_NTZ-Spalte in Snowflake repräsentiert, an Spark gesendet wird, enthält dieser Wert keine Zeitzoneninformationen. Spark behandelt den Wert als12:00:00
in New York.Wenn Spark diesen Wert
12:00:00
(in New York) zurück an Snowflake sendet, um ihn in eine TIMESTAMP_LTZ-Spalte zu laden, wird er automatisch umgewandelt und als18:00:00
(für die Zeitzone Warschau) geladen.Wenn dieser Wert dann in Snowflake in TIMESTAMP_NTZ umgewandelt wird, wird dem Benutzer
18:00:00
angezeigt, was sich vom ursprünglichen Wert12:00:00
unterscheidet.
Zusammenfassend empfiehlt Snowflake, mindestens eine der folgenden Regeln strikt einzuhalten:
Verwenden Sie die gleiche Zeitzone, idealerweise
UTC
, sowohl für Spark als auch für Snowflake.Verwenden Sie nur den Datentyp TIMESTAMP_LTZ für die Datenübertragung zwischen Spark und Snowflake.
Scala-Beispielprogramm¶
Wichtig
Dieses Beispielprogramm geht davon aus, dass Sie die Version 2.2.0 (oder höher) des Konnektors verwenden, der einen internen Snowflake-Stagingbereich zum Speichern temporärer Daten verwendet und daher keinen S3-Speicherort zum Speichern temporärer Daten benötigt. Wenn Sie eine frühere Version verwenden, müssen Sie über einen vorhandenen S3-Speicherort verfügen und Werte für tempdir
, awsAccessKey
, awsSecretKey
für sfOptions
angeben. Weitere Details dazu finden Sie unter AWS-Optionen für externe Datenübertragung (unter diesem Thema).
Das folgende Scala-Programm bietet einen vollständigen Anwendungsfall für den Snowflake-Konnektor für Spark. Ersetzen Sie vor der Verwendung des Codes die folgenden Zeichenfolgen durch die entsprechenden Werte, wie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema) beschrieben:
<Kontobezeichner>
: Ihr Kontobezeichner.<Benutzername>
,<Kennwort>
: Anmeldeinformationen für den Snowflake-Benutzer.<Datenbank>
,<Schema>
,<Warehouse>
: Standardwerte für die Snowflake-Sitzung.
Das Scala-Beispielprogramm verwendet die Basisauthentifizierung (d. h. Benutzername und Kennwort). Wenn Sie sich mit OAuth authentifizieren möchten, finden Sie die entsprechenden Informationen unter Verwenden von External OAuth (unter diesem Thema).
import org.apache.spark.sql._
//
// Configure your Snowflake environment
//
var sfOptions = Map(
"sfURL" -> "<account_identifier>.snowflakecomputing.com",
"sfUser" -> "<user_name>",
"sfPassword" -> "<password>",
"sfDatabase" -> "<database>",
"sfSchema" -> "<schema>",
"sfWarehouse" -> "<warehouse>"
)
//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t1")
.load()
//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("query", "select c1, count(*) from t1 group by c1")
.load()
//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t2")
.mode(SaveMode.Overwrite)
.save()
Verwenden des Konnektors mit Python¶
Die Verwendung des Konnektors mit Python ist der Verwendung von Scala sehr ähnlich.
Wir empfehlen Ihnen, das in der Spark-Distribution enthaltene bin/pyspark
-Skript zu verwenden.
Konfigurieren des pyspark
-Skripts¶
Das pyspark
-Skript muss ähnlich wie das spark-shell
-Skript konfiguriert werden, wobei die Optionen --packages
oder --jars
verwendet werden. Beispiel:
bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3
Vergessen Sie nicht, den Snowflake-Konnektor für Spark und die .jar-Dateien des JDBC-Konnektors in die CLASSPATH-Umgebungsvariable aufzunehmen.
Weitere Informationen zur Konfiguration des spark-shell
-Skripts finden Sie unter Schritt 4: Lokales Spark-Cluster oder die von Amazon EMR gehostete Spark-Umgebung konfigurieren.
Aktivieren/Deaktivieren von Pushdown in einer Sitzung¶
Version 2.1.0 (und höher) des Konnektors unterstützt die Abfrageverschiebung (Pushdown), wodurch die Performance deutlich gesteigert wird. Dabei wird die Verarbeitung von Abfragen an Snowflake übergeben, wenn Snowflake die Spark-Datenquelle ist.
Standardmäßig ist Pushdown aktiviert.
Zur Deaktivierung des Pushdowns innerhalb einer Spark-Sitzung für einen bestimmten DataFrame:
Nach der Instanziierung eines
SparkSession
-Objekts rufen Sie die statische MethodeSnowflakeConnectorUtils.disablePushdownSession
auf und übergeben dasSparkSession
Objekt. Beispiel:sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
Erstellen Sie einen DataFrame mit der Option autopushdown, die auf
off
gesetzt ist. Beispiel:df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", query) \ .option("autopushdown", "off") \ .load()
Beachten Sie, dass Sie die Option
autopushdown
auch in einemDictionary
festlegen können, das Sie dann an dieoptions
-Methode übergeben (z. B. insfOptions
im obigen Beispiel).
Um Pushdown nach der Deaktivierung wieder zu aktivieren, rufen Sie die statische Methode SnowflakeConnectorUtils.enablePushdownSession
auf (mit Übergabe des Objekts SparkSession
) und erstellen ein DataFrame mit aktiviertem autopushdown
.
Beispiel für ein Python-Skript¶
Wichtig
Dieses Beispielskript geht davon aus, dass Sie die Version 2.2.0 (oder höher) des Konnektors verwenden, der einen internen Snowflake-Stagingbereich zum Speichern temporärer Daten verwendet und daher keinen S3-Speicherort zum Speichern dieser Daten benötigt. Wenn Sie eine frühere Version verwenden, müssen Sie über einen vorhandenen S3-Speicherort verfügen und Werte für tempdir
, awsAccessKey
, awsSecretKey
für sfOptions
angeben. Weitere Details dazu finden Sie unter AWS-Optionen für externe Datenübertragung (unter diesem Thema).
Nachdem das pyspark
-Skript konfiguriert wurde, können Sie SQL-Abfragen und andere Operationen ausführen. Hier ist ein Beispiel für ein Python-Skript, das eine einfache SQL-Abfrage ausführt. Dieses Skript veranschaulicht die grundlegende Verwendung des Konnektors. Die meisten Scala-Beispiele in diesem Dokument können mit minimalem Aufwand für die Verwendung mit Python angepasst werden.
Das Python-Beispielskript verwendet die Basisauthentifizierung (d. h. Benutzername und Kennwort). Wenn Sie sich mit OAuth authentifizieren möchten, finden Sie die entsprechenden Informationen unter Verwenden von External OAuth (unter diesem Thema).
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')
# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")
# Set options below
sfOptions = {
"sfURL" : "<account_identifier>.snowflakecomputing.com",
"sfUser" : "<user_name>",
"sfPassword" : "<password>",
"sfDatabase" : "<database>",
"sfSchema" : "<schema>",
"sfWarehouse" : "<warehouse>"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "select 1 as my_num union all select 2 as my_num") \
.load()
df.show()
Tipp
Beachten Sie die Verwendung von sfOptions
und SNOWFLAKE_SOURCE_NAME
. Dies vereinfacht den Code und reduziert die Wahrscheinlichkeit von Fehlern.
Details zu den unterstützten Optionen für sfOptions
finden Sie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema).
Zuordnen von Datentypen¶
Der Spark-Konnektor unterstützt die Konvertierung zwischen vielen gängigen Datentypen.
Von Spark SQL zu Snowflake¶
Spark-Datentyp
Snowflake-Datentyp
ArrayType
VARIANT
BinaryType
Nicht unterstützt
BooleanType
BOOLEAN
ByteType
INTEGER. Snowflake unterstützt den Typ BYTE nicht.
DateType
DATE
DecimalType
DECIMAL
DoubleType
DOUBLE
FloatType
FLOAT
IntegerType
INTEGER
LongType
INTEGER
MapType
VARIANT
ShortType
INTEGER
StringType
Wenn Länge angegeben ist, VARCHAR(N); andernfalls VARCHAR
StructType
VARIANT
TimestampType
TIMESTAMP
Von Snowflake zu Spark SQL¶
Snowflake-Datentyp
Spark-Datentyp
ARRAY
StringType
BIGINT
DecimalType(38, 0)
BINARY
Nicht unterstützt
BLOB
Nicht unterstützt
BOOLEAN
BooleanType
CHAR
StringType
CLOB
StringType
DATE
DateType
DECIMAL
DecimalType
DOUBLE
DoubleType
FLOAT
DoubleType
INTEGER
DecimalType(38, 0)
OBJECT
StringType
TIMESTAMP
TimestampType
TIME
StringType
(Spark-Konnektor-Version 2.4.14 oder höher)VARIANT
StringType
Aufrufen der Methode DataFrame.show¶
Wenn Sie die Methode DataFrame.show
aufrufen und eine Zahl übergeben, die kleiner ist als die Anzahl der Zeilen im DataFrame, erstellen Sie ein DataFrame, das nur die Zeilen enthält, die in sortierter Reihenfolge angezeigt werden sollen.
Gehen Sie dabei wie folgt vor:
Rufen Sie zuerst die Methode
sort
auf, um einen DataFrame zurückzugeben, der sortierte Zeilen enthält.Rufen Sie die Methode
limit
auf diesem DataFrame auf, um einen DataFrame zurückzugeben, der nur die Zeilen enthält, die Sie anzeigen möchten.Rufen Sie die Methode
show
auf dem zurückgegebenen DataFrame auf.
So können Sie beispielsweise 5 Zeilen anzeigen und die Ergebnisse nach der Spalte my_col
sortieren:
val dfWithRowsToShow = originalDf.sort("my_col").limit(5) dfWithRowsToShow.show(5)
Wenn Sie andernfalls show
aufrufen, um eine Teilmenge von Zeilen in DataFrame anzuzeigen, könnten verschiedene Ausführungen des Codes dazu führen, dass unterschiedliche Zeilen angezeigt werden.
Einstellen der Konfigurationsoptionen für den Konnektor¶
In den folgenden Abschnitten werden die Optionen aufgeführt, mit denen Sie das Verhalten des Konnektors konfigurieren können:
Um diese Optionen festzulegen, rufen Sie die Methode .option(<Schlüssel>, <Wert>)
oder die .options(<Zuordnung>)
-Methode der Klasse Spark DataframeReader auf.
Tipp
Um die Verwendung der Optionen zu vereinfachen, empfiehlt Snowflake, die Optionen in einem einzigen Map
-Objekt anzugeben und .options(<Zuordnung>)
aufzurufen, um die Optionen zu festzulegen.
Erforderliche Verbindungsoptionen¶
Die folgenden Optionen sind für die Verbindung mit Snowflake erforderlich:
sfUrl
Gibt den Hostnamen für Ihr Konto im folgenden Format an:
account_identifier.snowflakecomputing.com
account_identifier
ist ihr Kontobezeichner.sfUser
Anmeldename für den Snowflake-Benutzer.
Außerdem müssen Sie zur Authentifizierung eine der folgenden Optionen verwenden:
sfPassword
Kennwort für den Snowflake-Benutzer.
pem_private_key
Privater Schlüssel (im PEM-Format) für die Schlüsselpaar-Authentifizierung. Eine Anleitung dazu finden Sie unter Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation.
sfAuthenticator
Gibt an, dass zum Authentifizieren bei Snowflake External OAuth verwendet wird. Setzen Sie den Wert auf
oauth
.Für die Verwendung von External OAuth muss der Parameter
sfToken
festgelegt werden.
sfToken
(Erforderlich bei Verwendung von External OAuth) Setzen Sie den Wert auf Ihr External OAuth-Zugriffstoken.
Für diesen Verbindungsparameter muss der Parameterwert
sfAuthenticator
aufoauth
gesetzt werden.Der Standardwert ist „none“.
Erforderliche Kontextoptionen¶
Die folgenden Optionen sind erforderlich, um den Datenbank- und Schemakontext für die Sitzung festzulegen:
sfDatabase
Die Datenbank, die nach dem Verbinden für die Sitzung verwendet werden soll.
sfSchema
Das Schema, das nach dem Verbinden für die Sitzung verwendet werden soll.
Zusätzliche Kontextoptionen¶
Die in diesem Abschnitt aufgeführten Optionen sind nicht erforderlich.
sfAccount
Kontobezeichner (z. B.
myorganization-myaccount
). Diese Option ist nicht mehr erforderlich, da der Kontobezeichner insfUrl
angegeben ist. Sie ist hier nur aus Gründen der Abwärtskompatibilität dokumentiert.sfWarehouse
Das standardmäßige virtuelle Warehouse, das nach dem Verbinden für die Sitzung verwendet werden soll.
sfRole
Die Standardsicherheitsrolle, die nach dem Verbinden für die Sitzung verwendet werden soll.
Proxy-Optionen¶
Die in diesem Abschnitt aufgeführten Optionen sind nicht erforderlich.
use_proxy
Gibt an, ob der Konnektor einen Proxy verwenden soll:
true
gibt an, dass der Konnektor einen Proxy verwenden soll.false
gibt an, dass der Konnektor keinen Proxy verwenden soll.
Der Standardwert ist
false
.proxy_host
(Erforderlich, wenn
use_proxy
den Werttrue
hat) Gibt den Hostnamen des zu verwendenden Proxyservers an.proxy_port
(Erforderlich, wenn
use_proxy
den Werttrue
hat) Gibt die Portnummer des zu verwendenden Proxyservers an.proxy_protocol
Gibt das Protokoll an, das für die Verbindung zum Proxyserver verwendet wird. Sie können einen der folgenden Werte angeben:
http
https
Der Standardwert ist
http
.Dies wird nur für Snowflake auf AWS unterstützt.
Diese Option wurde in Version 2.11.1 des Spark-Konnektors hinzugefügt.
proxy_user
Gibt den Benutzernamen für die Authentifizierung beim Proxyserver an. Legen Sie dies fest, wenn der Proxyserver eine Authentifizierung erfordert.
Dies wird nur für Snowflake auf AWS unterstützt.
proxy_password
Gibt das Kennwort von
proxy_user
für die Authentifizierung beim Proxyserver an. Legen Sie dies fest, wenn der Proxyserver eine Authentifizierung erfordert.Dies wird nur für Snowflake auf AWS unterstützt.
non_proxy_hosts
Gibt die Liste der Hosts an, mit denen sich der Konnektor unter Umgehung des Proxyservers direkt verbinden soll.
Verwenden Sie das URL-Escapezeichen
%7C
(Pipe-Symbol), um den Hostnamen zu separieren. Sie können auch ein Sternchen (*
) als Platzhalter verwenden.Dies wird nur für Snowflake auf AWS unterstützt.
Zusätzliche Optionen¶
Die in diesem Abschnitt aufgeführten Optionen sind nicht erforderlich.
sfTimezone
Die Zeitzone, die Snowflake bei der Verwendung von Spark verwenden soll. Beachten Sie, dass der Parameter nur die Zeitzone in Snowflake festlegt. Die Spark-Umgebung bleibt unverändert. Folgende Werte werden unterstützt:
spark
: Verwendet die Zeitzone von Spark (Standard).snowflake
: Verwendet die aktuelle Zeitzone für Snowflake.sf_default
: Verwendet die Standardzeitzone des Snowflake-Benutzers, der sich verbindet.time_zone
: Verwenden Sie eine bestimmte Zeitzone (z. B.America/New_York
), falls gültig.Weitere Informationen zu den Auswirkungen dieser Option finden Sie unter Arbeiten mit Zeitstempeln und Zeitzonen (unter diesem Thema).
sfCompress
Wenn auf
on
(Standard) eingestellt, werden die zwischen Snowflake und Spark übermittelten Daten komprimiert.s3MaxFileSize
Die Größe der Datei, die beim Verschieben von Daten von Snowflake nach Spark verwendet wird. Die Voreinstellung ist 10 MB.
preactions
Eine durch Semikolon getrennte Liste von SQL-Befehlen, die ausgeführt werden, bevor Daten zwischen Spark und Snowflake übertragen werden.
Wenn ein SQL-Befehl
%s
enthält, wird%s
durch den Tabellennamen ersetzt, auf den für die Operation verwiesen wird.postactions
Eine durch Semikolon getrennte Liste von SQL-Befehlen, die ausgeführt werden, nachdem Daten zwischen Spark und Snowflake übertragen wurden.
Wenn ein SQL-Befehl
%s
enthält, wird dies durch den Tabellennamen ersetzt, auf den für die Operation verwiesen wird.truncate_columns
Wenn auf
on
(Standard) gesetzt, werden beim COPY-Befehl Textzeichenfolgen automatisch abgeschnitten, die die Länge der Zielspalte überschreiten. Wenn aufoff
gesetzt, erstellt der Befehl beim Überschreiten der Länge der Zielspalte einen Fehler.truncate_table
Dieser Parameter steuert, ob Snowflake das Schema einer Snowflake-Zieltabelle beim Überschreiben dieser Tabelle beibehalten wird.
Wenn eine Zieltabelle in Snowflake überschrieben wird, wird standardmäßig auch das Schema dieser Zieltabelle überschrieben. Das neue Schema basiert auf dem Schema der Quelltabelle (dem Spark-DataFrame).
Manchmal ist das Schema der Quelle jedoch nicht ideal. Ein Benutzer kann beispielsweise wünschen, dass eine Snowflake-Zieltabelle in Zukunft FLOAT-Werte speichern kann, obwohl der Datentyp der ursprünglichen Quellspalte INTEGER ist. In diesem Fall sollte das Schema der Snowflake-Tabelle nicht überschrieben werden. Die Snowflake-Tabelle sollte lediglich abgeschnitten und dann mit ihrem aktuellen Schema wiederverwendet werden.
Die möglichen Werte dieses Parameters sind:
on
off
Wenn dieser Parameter
on
ist, wird das ursprüngliche Schema der Zieltabelle beibehalten. Wenn dieser Parameteroff
ist, dann wird das alte Schema der Tabelle ignoriert und ein neues Schema basierend auf dem Schema der Quelle erzeugt.Dieser Parameter ist optional.
Der Standardwert dieses Parameters ist
off
(d. h. standardmäßig wird das ursprüngliche Tabellenschema überschrieben).Weitere Details zum Zuordnen von Spark-Datentypen zu Snowflake-Datentypen (und umgekehrt) finden Sie unter Zuordnen von Datentypen (unter diesem Thema).
continue_on_error
Diese Variable steuert, ob der COPY-Befehl abgebrochen wird, wenn der Benutzer ungültige Daten eingibt (z. B. ungültiges JSON-Format für eine Spalte vom Datentyp Variant).
Die möglichen Werte sind:
on
off
Der Wert
on
bedeutet, dass bei Auftreten eines Fehlers der Vorgang fortgesetzt wird. Der Wertoff
bedeutet, dass bei Auftreten eines Fehlers der Vorgang abgebrochen wird.Dieser Parameter ist optional.
Der Standardwert dieses Parameters ist
off
.Es wird nicht empfohlen, diese Option zu aktivieren. Wenn beim Kopieren (COPY) in Snowflake mit dem Spark-Konnektor ein Fehler gemeldet wurde, dann führt dies wahrscheinlich zu fehlenden Daten.
Bemerkung
Wenn Zeilen abgelehnt werden oder fehlen und diese Zeilen in der Eingabequelle nicht eindeutig fehlerhaft sind, wenden Sie sich an Snowflake.
usestagingtable
Dieser Parameter steuert, ob das Laden von Daten über eine Stagingtabelle erfolgt.
Eine Staging-Tabelle ist eine normale Tabelle (mit einem temporären Namen), die vom Konnektor erstellt wird. Wenn die Datenladeoperation erfolgreich ist, wird die ursprüngliche Zieltabelle gelöscht, und die Staging-Tabelle in den Namen der ursprünglichen Zieltabelle umbenannt. Wenn die Datenladeoperation fehlschlägt, wird die Staging-Tabelle gelöscht, und die Zieltabelle wird mit den Daten belassen, die sie unmittelbar vor der Operation hatte. Somit ermöglicht die Staging-Tabelle, dass die ursprünglichen Daten der Zieltabelle beibehalten werden, wenn die Operation fehlschlägt. Aus Sicherheitsgründen empfiehlt Snowflake in den meisten Fällen dringend die Verwendung einer Staging-Tabelle.
Damit der Konnektor eine Staging-Tabelle erstellen kann, muss der Benutzer, der die Kopieroperation (COPY) über den Spark-Konnektor ausführt, über ausreichende Berechtigungen zum Erstellen einer Tabelle verfügen. Das direkte Laden (d. h. Laden ohne Verwendung einer Staging-Tabelle) ist sinnvoll, wenn der Benutzer keine Berechtigung zum Erstellen einer Tabelle hat.
Die möglichen Werte dieses Parameters sind:
on
off
Wenn der Parameter
on
ist, wird eine Staging-Tabelle verwendet. Wenn dieser Parameteroff
ist, dann werden die Daten direkt in die Zieltabelle geladen.Dieser Parameter ist optional.
Der Standardwert dieses Parameters ist
on
(d. h. Stagingtabelle wird verwendet).
autopushdown
Dieser Parameter steuert, ob das automatische Abfrage-Pushdown aktiviert ist.
Wenn Pushdown aktiviert ist, wird beim Ausführen einer Abfrage in Spark ein Teil der Abfrageverarbeitung auf den Snowflake-Server „verschoben“. Dies verbessert die Performance einiger Abfragen.
Dieser Parameter ist optional.
Der Standardwert ist
on
, wenn der Konnektor mit einer kompatiblen Version von Spark verwendet wird. Andernfalls ist der Standardwertoff
.Wenn der Konnektor mit einer anderen Version von Spark verwendet wird, als für den Konnektor vorgesehen (z. B. wenn Version 3.2 des Konnektors mit Version 3.3 von Spark verwendet wird), dann ist der automatische Pushdown deaktiviert, auch wenn dieser Parameter auf
on
eingestellt ist.purge
Wenn diese Einstellung auf
on
gesetzt ist, löscht der Konnektor temporäre Dateien, die beim Übertragen von Spark nach Snowflake per externer Datenübertragung erstellt wurden. Wenn dieser Parameter aufoff
gesetzt ist, werden diese Dateien nicht automatisch vom Konnektor gelöscht.Das Bereinigen funktioniert nur bei Datenübertragungen von Spark nach Snowflake, nicht bei Datenübertragungen von Snowflake nach Spark.
Die möglichen Werte sind
on
off
Der Standardwert ist
off
.columnmap
Dieser Parameter ist nützlich, wenn Sie Daten von Spark nach Snowflake schreiben und die Spaltennamen in der Snowflake-Tabelle nicht mit den Spaltennamen in der Spark-Tabelle übereinstimmen. Sie können eine Zuordnung erstellen, die angibt, welche Spark-Quellspalte zu welcher Snowflake-Zielspalte gehört.
Der Parameter ist ein einzelnes Zeichenfolgenliteral in Form von:
"Map(col_2 -> col_b, col_3 -> col_a)"
Betrachten Sie beispielsweise das folgende Szenario:
Angenommen, ein DataFrame namens
df
in Spark hat drei Spalten:col_1
,col_2
,col_3
Außerdem hat eine Tabelle mit dem Namen
tb
in Snowflake zwei Spalten:col_a
,col_b
Sie möchten folgende Werte kopieren:
Von
df.col_2
nachtb.col_b
.Von
df.col_3
bistb.col_a
.
Der Wert des
columnmap
-Parameters wäre:Map(col_2 -> col_b, col_3 -> col_a)
Sie können diesen Wert generieren, indem Sie den folgenden Scala-Code ausführen:
Map("col_2"->"col_b","col_3"->"col_a").toString()
Der Standardwert dieses Parameters ist null. Mit anderen Worten, standardmäßig sollten die Spaltennamen in der Quell- und Zieltabelle übereinstimmen.
Dieser Parameter wird nur beim Schreiben von Spark nach Snowflake verwendet. Er gilt nicht beim Schreiben von Snowflake nach Spark.
keep_column_case
Beim Schreiben einer Tabelle von Spark nach Snowflake wandelt der Spark-Konnektor die Buchstaben der Spaltennamen standardmäßig in Großbuchstaben um, sofern die Spaltennamen nicht in doppelte Anführungszeichen gesetzt sind.
Beim Schreiben einer Tabelle von Snowflake nach Spark setzt der Spark-Konnektor standardmäßig alle Spaltennamen in Anführungszeichen, die keine Großbuchstaben, Unterstriche und Ziffern enthalten.
Wenn Sie „keep_column_case“ auf
on
setzen, nimmt der Spark-Konnektor diese Änderungen nicht vor.Die möglichen Werte sind:
on
off
Der Standardwert ist
off
.column_mapping
Der Konnektor muss Spalten aus dem Spark-DataFrame der Snowflake-Tabelle zuordnen. Dies kann anhand von Spaltennamen (unabhängig von der Reihenfolge) oder anhand der Spaltenreihenfolge erfolgen (d. h. die erste Spalte im DataFrame wird unabhängig vom Spaltennamen der ersten Spalte in der Tabelle zugeordnet).
Standardmäßig erfolgt die Zuordnung anhand der Reihenfolge. Sie können diese Einstellung überschreiben, indem Sie den Parameter auf
name
setzen, wodurch der Konnektor angewiesen wird, Spalten anhand von Spaltennamen zuzuordnen. (Bei der Namenszuordnung wird Groß-/Kleinschreibung nicht berücksichtigt.)Die möglichen Werte dieses Parameters sind:
order
name
Der Standardwert ist
order
.column_mismatch_behavior
Dieser Parameter gilt nur, wenn der Parameter
column_mapping
aufname
gesetzt ist.Wenn die Spaltennamen im Spark-DataFrame und in der Snowflake-Tabelle nicht übereinstimmen, gilt Folgendes:
Wenn
column_mismatch_behavior
auferror
gesetzt ist, meldet der Spark-Konnektor einen Fehler.Wenn
column_mismatch_behavior
ignore
ist, ignoriert der Spark-Konnektor den Fehler.Der Treiber verwirft jede Spalte im Spark-DataFrame, die keine entsprechende Spalte in der Snowflake-Tabelle enthält.
Der Treiber fügt NULLs in jede Spalte der Snowflake-Tabelle ein, die keine entsprechende Spalte im Spark-DataFrame enthält.
Mögliche Fehler sind:
Der Spark-DataFrame kann Spalten enthalten, die bis auf die Groß-/Kleinschreibung identisch sind. Da bei der Zuordnung von Spaltennamen die Groß-/Kleinschreibung nicht berücksichtigt wird, kann die korrekte Zuordnung vom DataFrame zur Tabelle nicht ermittelt werden.
Die Snowflake-Tabelle kann Spalten enthalten, die bis auf die Groß-/Kleinschreibung identisch sind. Da bei der Zuordnung von Spaltennamen die Groß-/Kleinschreibung nicht berücksichtigt wird, kann die korrekte Zuordnung vom DataFrame zur Tabelle nicht ermittelt werden.
Der Spark-DataFrame und die Snowflake-Tabelle weisen möglicherweise keine gemeinsamen Spaltennamen auf. Theoretisch könnte der Spark-Konnektor NULLs in jede Spalte jeder Zeile einfügen, doch ist dies normalerweise sinnlos, sodass der Konnektor auch dann einen Fehler ausgibt, wenn
column_mismatch_behavior
aufignore
gesetzt ist.
Die möglichen Werte dieses Parameters sind:
error
ignore
Der Standardwert ist
error
.time_output_format
Mit diesem Parameter kann der Benutzer das Format für die zurückgegebenen
TIME
-Daten angeben.Die möglichen Werte dieses Parameters sind die möglichen Werte für Zeitformate, die unter Uhrzeitformate angegeben sind.
Dieser Parameter wirkt sich nur auf die Ausgabe aus, nicht auf die Eingabe.
timestamp_ntz_output_format
, .timestamp_ltz_output_format
, .timestamp_tz_output_format
Diese Optionen geben das Ausgabeformat für Zeitstempelwerte an. Die Standardwerte für diese Optionen sind:
Konfigurationsoption
Standardwert
timestamp_ntz_output_format
"YYYY-MM-DD HH24:MI:SS.FF3"
timestamp_ltz_output_format
"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"
timestamp_tz_output_format
"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"
Wenn diese Optionen auf
"sf_current"
gesetzt sind, verwendet der Konnektor die für die Sitzung angegebenen Formate.partition_size_in_mb
Dieser Parameter wird verwendet, wenn das Resultset der Abfrage sehr groß ist und in mehrere DataFrame-Partitionen aufgeteilt werden muss. Dieser Parameter gibt die empfohlene unkomprimierte Größe für jede DataFrame-Partition an. Erhöhen Sie diesen Wert, um die Anzahl der Partitionen zu verringern.
Dieser Wert ist die empfohlene Größe. Die tatsächliche Größe der Partitionen kann kleiner oder größer sein.
Diese Option gilt nur, wenn der Parameter use_copy_unload den Wert FALSE hat.
Dieser Parameter ist optional.
Der Standardwert ist
100
(MB).
use_copy_unload
Wenn der Wert
FALSE
ist, verwendet Snowflake das Arrow-Datenformat für die Auswahl von Daten mit SELECT. Wenn der WertTRUE
ist, kehrt Snowflake zum alten Verhalten des BefehlsCOPY UNLOAD
zurück, um ausgewählte Daten zu übertragen.Dieser Parameter ist optional.
Der Standardwert ist
FALSE
.treat_decimal_as_long
Bei
TRUE
wird der Spark-Konnektor so konfiguriert, dass er für Abfragen, die den TypDecimal(precision, 0)
zurückgeben,Long
-Werte (und nichtBigDecimal
-Werte) zurückgibt.Der Standardwert ist
FALSE
.Diese Option wurde in Version 2.11.1 des Spark-Konnektors hinzugefügt.
s3_stage_vpce_dns_name
Gibt den DNS-Namen Ihres VPC-Endpunkts für den Zugriff auf interne Stagingbereiche an.
Diese Option wurde in Version 2.11.1 des Spark-Konnektors hinzugefügt.
support_share_connection
Bei
FALSE
wird der Spark-Konnektor so konfiguriert, dass er für jeden Job und jede Aktion, bei dem/der für den Zugriff auf Snowflake dieselben Spark-Konnektor-Optionen verwendet werden, eine neue JDBC-Verbindung erstellt.Der Standardwert ist
TRUE
, was bedeutet, dass die verschiedenen Jobs und Aktionen die gleiche JDBC-Verbindung nutzen, wenn sie die gleichen Spark-Konnektor-Optionen für den Zugriff auf Snowflake verwenden.Wenn Sie diese Einstellung programmgesteuert aktivieren oder deaktivieren müssen, verwenden Sie die folgenden globalen statischen Funktionen:
SparkConnectorContext.disableSharedConnection()
SparkConnectorContext.enableSharingJDBCConnection()
Bemerkung
In den folgenden Sonderfällen verwendet der Spark-Konnektor keine gemeinsame JDBC-Verbindung:
Wenn vorherige oder nachfolgende Aktionen festgelegt sind und es sich bei denen nicht um CREATE TABLE, DROP TABLE oder MERGE INTO handelt, verwendet der Spark-Konnektor nicht die gemeinsam genutzte Verbindung.
Hilfsprogrammfunktionen in Utils wie
Utils.runQuery()
undUtils.getJDBCConnection()
verwenden nicht die gemeinsam genutzte Verbindung nicht.
Diese Option wurde in Version 2.11.2 des Spark-Konnektors hinzugefügt.
force_skip_pre_post_action_check_for_shared_session
Wenn
TRUE
, wird der Spark-Konnektor so konfiguriert, dass die Validierung von vorherigen und nachfolgenden Aktionen für die gemeinsame Nutzung von Sitzungen deaktiviert wird.Der Standardwert ist
FALSE
.Wichtig
Stellen Sie sich vor dem Einstellen dieser Option sicher, dass die Abfragen in den vorherigen und den nachfolgenden Aktionen keine Auswirkungen auf die Sitzungseinstellungen haben. Andernfalls könnten Probleme bei den Ergebnissen auftreten.
Diese Option wurde in Version 2.11.3 des Spark-Konnektors hinzugefügt.
Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation¶
Der Spark-Konnektor unterstützt Schlüsselpaar-Authentifizierung und Schlüsselrotation.
Im ersten Schritt führen Sie die Erstkonfiguration der Schlüsselpaar-Authentifizierung durch, wie unter Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation gezeigt.
Senden Sie eine unverschlüsselte Kopie des privaten Schlüssels mithilfe der Verbindungsoption
pem_private_key
.
Achtung
Aus Sicherheitsgründen sollten Sie diesen pem_private_key
-Schlüssel in Ihrer Anwendung nicht fest kodieren, sondern den Parameter dynamisch festlegen, nachdem Sie den Schlüssel aus einer sicheren Quelle gelesen haben. Wenn der Schlüssel verschlüsselt ist, entschlüsseln Sie ihn, und senden Sie die entschlüsselte Version.
Beachten Sie im Python-Beispiel, dass die pem_private_key
-Datei rsa_key.p8
:
direkt aus einer kennwortgeschützten Datei mit der Umgebungsvariablen
PRIVATE_KEY_PASSPHRASE
gelesen wird undden Ausdruck
pkb
in dersfOptions
-Zeichenfolge verwendet.
Um eine Verbindung herzustellen, können Sie das Python-Beispiel in einer Datei speichern (d. h. <file.py>
) und dann den folgenden Befehl ausführen:
spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>
Python
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os
with open("<path>/rsa_key.p8", "rb") as key_file:
p_key = serialization.load_pem_private_key(
key_file.read(),
password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend = default_backend()
)
pkb = p_key.private_bytes(
encoding = serialization.Encoding.PEM,
format = serialization.PrivateFormat.PKCS8,
encryption_algorithm = serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")
sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')
sfOptions = {
"sfURL" : "<account_identifier>.snowflakecomputing.com",
"sfUser" : "<user_name>",
"pem_private_key" : pkb,
"sfDatabase" : "<database>",
"sfSchema" : "schema",
"sfWarehouse" : "<warehouse>"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "COLORS") \
.load()
df.show()
Verwenden von External OAuth¶
Ab Spark-Konnektor, Version 2.7.0 können Sie External OAuth zum Authentifizieren bei Snowflake mittels Scala-Beispielprogramm oder Python-Beispielskript verwenden.
Bevor Sie External OAuth und den Spark-Konnektor zur Authentifizierung bei Snowflake verwenden, konfigurieren Sie eine External OAuth-Sicherheitsintegration für einen der unterstützten External OAuth-Autorisierungsserver oder einen kundenspezifischen Client für External OAuth.
Beachten Sie in den Scala- und Python-Beispielen, dass der Parameter sfPassword
durch die Parameter sfAuthenticator
und sfToken
ersetzt wird.
Scala:
// spark connector version val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME import org.apache.spark.sql.DataFrame var sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<username>", "sfAuthenticator" -> "oauth", "sfToken" -> "<external_oauth_access_token>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) // // Create a DataFrame from a Snowflake table // val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "region") .load() // // Join, augment, aggregate, etc. the data in Spark and then use the // Data Source API to write the data back to a table in Snowflake // df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t2") .mode(SaveMode.Overwrite) .save()
Python:
from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * sc = SparkContext("local", "Simple App") spark = SQLContext(sc) spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>') # You might need to set these sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>") sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>") # Set options below sfOptions = { "sfURL" : "<account_identifier>.snowflakecomputing.com", "sfUser" : "<user_name>", "sfAuthenticator" : "oauth", "sfToken" : "<external_oauth_access_token>", "sfDatabase" : "<database>", "sfSchema" : "<schema>", "sfWarehouse" : "<warehouse>" } SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", "select 1 as my_num union all select 2 as my_num") \ .load() df.show()
AWS-Optionen für externe Datenübertragung¶
Diese Optionen werden verwendet, um den Amazon S3-Speicherort für die Speicherung temporärer Daten anzugeben und um Authentifizierungsdetails für den Zugriff auf den Speicherort bereitzustellen. Sie sind nur erforderlich, wenn Sie eine externe Datenübertragung durchführen. Externe Datenübertragungen sind erforderlich, wenn eine der folgenden Voraussetzungen erfüllt ist:
Sie verwenden die Version 2.1.x oder niedriger des Spark-Konnektors (der keine interne Übertragung unterstützt) oder
Ihre Übertragung dauert wahrscheinlich 36 Stunden oder länger (interne Übertragung verwendet temporäre Anmeldeinformationen, die nach 36 Stunden ablaufen).
tempDir
Der S3-Speicherort, an dem Daten zwischengespeichert werden (z. B.
s3n://xy12345-bucket/spark-snowflake-tmp/
).Wenn
tempDir
angegeben ist, müssen Sie auch eine der beiden Optionen angeben:awsAccessKey
,awsSecretKey
. odertemporary_aws_access_key_id
,temporary_aws_secret_access_key
,temporary_aws_session_token
awsAccessKey
,awsSecretKey
Dies sind standardmäßige AWS-Anmeldeinformationen, die den Zugriff auf den in
tempDir
angegebenen Speicherort erlauben. Beachten Sie, dass diese beiden Optionen zusammen festgelegt werden müssen.Wenn beide festgelegt sind, können sie aus dem vorhandenen
SparkContext
-Objekt abgerufen werden.Wenn Sie diese Variablen angeben, müssen Sie auch
tempDir
angeben.Diese Anmeldeinformationen sollten auch für den Hadoop-Cluster festgelegt werden.
temporary_aws_access_key_id
,temporary_aws_secret_access_key
,temporary_aws_session_token
Dies sind temporäre AWS-Anmeldeinformationen, die den Zugriff auf den in
tempDir
angegebenen Speicherort ermöglichen. Beachten Sie, dass alle drei Optionen zusammen festgelegt werden müssen.Wenn diese Optionen festgelegt sind, haben sie Vorrang vor den Optionen
awsAccessKey
undawsSecretKey
.Wenn Sie
temporary_aws_access_key_id
,temporary_aws_secret_access_key
undtemporary_aws_session_token
angeben, müssen Sie auchtempDir
angeben. Andernfalls werden diese Parameter ignoriert.check_bucket_configuration
Wenn auf
on
(Standard) gesetzt, prüft der Konnektor, ob für den Bucket, der zur Datenübertragung verwendet wird, eine Lebenszyklusrichtlinie konfiguriert ist (siehe Vorbereiten eines externen AWS S3-Buckets für weitere Informationen). Wenn keine Lebenszyklusrichtlinie vorhanden ist, wird eine Warnung protokolliert.Wenn Sie diese Option deaktivieren (durch Setzen auf
off
), wird diese Prüfung übersprungen. Dies kann hilfreich sein, wenn ein Benutzer auf die Bucketdatenoperationen zugreifen darf, nicht aber auf die Lebenszyklusrichtlinien des Buckets. Das Deaktivieren der Option kann auch die Ausführungszeiten von Abfragen etwas beschleunigen.
Weitere Details dazu finden Sie unter Authentifizierung von S3 für den Datenaustausch (unter diesem Thema).
Azure-Optionen für externe Datenübertragung¶
In diesem Abschnitt werden die Parameter beschrieben, die für Azure Blob-Speicher bei externen Datenübertragungen gelten. Externe Datenübertragungen sind erforderlich, wenn eine der folgenden Voraussetzungen erfüllt ist:
Sie verwenden die Version 2.1.x oder niedriger des Spark-Konnektors (der keine interne Übertragung unterstützt) oder
Ihre Übertragung dauert wahrscheinlich 36 Stunden oder länger (interne Übertragung verwendet temporäre Anmeldeinformationen, die nach 36 Stunden ablaufen).
Wenn Sie eine externe Übertragung mit Azure Blob-Speicher verwenden, geben Sie den Speicherort des Azure-Containers und die SAS (Freigabesignatur) für diesen Container mit den unten beschriebenen Parametern an.
tempDir
Der Azure Blob-Speichercontainer, in dem Daten zwischengespeichert werden. Dies hat die Form einer URL, zum Beispiel:
wasb://<Azure-Container>@<Azure-Konto>.<Azure-Endpunkt>/
temporary_azure_sas_token
Geben Sie das SAS-Token für den Azure Blob-Speicher an.
Weitere Details dazu finden Sie unter Authentifizieren von Azure für den Datenaustausch (unter diesem Thema).
Angeben von Azure-Informationen für temporären Speicher in Spark¶
Wenn Sie mit Azure Blob Storage einen temporären Speicher zum Übertragen von Daten zwischen Spark und Snowflake bereitstellen, müssen Sie neben dem Snowflake-Konnektor für Spark den Speicherort und die Anmeldeinformationen für den temporären Speicher angeben.
Um Spark die Angaben zum temporären Speicherort bereitzustellen, führen Sie in Ihrem Spark-Cluster beispielsweise folgende Befehle aus:
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Beachten Sie, dass der letzte Befehl die folgenden Variablen enthält:
<Container>
und<Konto>
: Dabei handelt es sich um den Container- und Kontoname für Ihre Azure-Bereitstellung.<Azure-Endpunkt>
: Dies ist der Endpunkt Ihres Azure-Bereitstellungsorts. Wenn Sie beispielsweise eine Azure US-Bereitstellung verwenden, ist der Endpunkt wahrscheinlichblob.core.windows.net
.<Azure-SAS>
: Dies ist das Shared Access Signature-Sicherheitstoken.
Ersetzen Sie jede dieser Variablen durch die passenden Informationen für Ihr Azure Blob Storage-Konto.
Übergeben von Snowflake-Sitzungsparametern als Optionen für den Konnektor¶
Der Snowflake-Konnektor für Spark unterstützt das Senden beliebiger Parameter auf Sitzungsebene an Snowflake (siehe Sitzungsparameter für weitere Informationen). Dies kann durch Hinzufügen eines ("<Schlüssel>" -> "<Wert>")
-Paares zum options
-Objekt erreicht werden, wobei <Schlüssel>
der Sitzungsparametername und <Wert>
der Wert ist.
Bemerkung
Der <Wert>
sollte eine Zeichenfolge sein, die in doppelte Anführungszeichen eingeschlossen ist, auch für Parameter, die Zahlen oder boolesche Werte akzeptieren (z. B. "1"
oder "true"
).
Im folgenden Codebeispiel wird der Sitzungsparameter USE_CACHED_RESULT mit dem Wert "false"
übergeben, wodurch die Verwendung der Ergebnisse zuvor ausgeführter Abfragen deaktiviert wird:
// ... assuming sfOptions contains Snowflake connector options
// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")
// ... now use sfOptions with the .options() method
Sicherheitshinweise¶
Kunden sollten sicherstellen, dass in einem Spark-System mit mehreren Knoten die Kommunikation zwischen den Knoten sicher ist. Der Spark-Master sendet Snowflake-Anmeldeinformationen an Spark-Worker, damit diese auf Snowflake-Stagingbereiche zugreifen können. Wenn die Kommunikation zwischen dem Spark-Master und den Spark-Worker nicht sicher ist, können die Anmeldeinformationen von einem nicht autorisierten Dritten gelesen werden.
Authentifizierung von S3 für den Datenaustausch¶
In diesem Abschnitt wird die Authentifizierung bei der Verwendung von S3 für den Datenaustausch beschrieben.
Die Ausführung dieser Aufgabe ist nur in einer der folgenden Situationen erforderlich:
Die Version des Snowflake-Konnektors für Spark ist 2.1.x (oder niedriger). Ab Version 2.2.0 verwendet der Konnektor für den Datenaustausch einen internen temporären Stagingbereich von Snowflake. Wenn Sie derzeit nicht die Version 2.2.0 (oder höher) des Konnektors verwenden, empfiehlt Snowflake dringend ein Upgrade auf die neueste Version.
Die Version des Snowflake-Konnektors für Spark ist 2.2.0 (oder höher), aber Ihre Jobs dauern regelmäßig länger als 36 Stunden. Dies ist die maximale Dauer für das AWS-Token, das der Konnektor verwendet, um auf den internen Stagingbereich für den Datenaustausch zuzugreifen.
Wenn Sie eine ältere Version des Konnektors verwenden, müssen Sie einen S3-Speicherort vorbereiten, den der Konnektor zum Datenaustausch zwischen Snowflake und Spark verwenden kann.
Um den Zugriff auf den S3-Bucket/das Verzeichnis für den Datenaustausch zwischen Spark und Snowflake zu ermöglichen (wie für tempDir
angegeben), werden zwei Authentifizierungsmethoden unterstützt:
Permanente AWS-Anmeldeinformationen (auch zur Konfiguration der Hadoop/Spark-Authentifizierung für den Zugriff auf S3 verwendet)
Temporäre AWS-Anmeldeinformationen
Verwenden von permanenten AWS-Anmeldeinformationen¶
Dies ist die Standard-Authentifizierungsmethode für AWS. Sie erfordert ein Paar aus awsAccessKey
- und awsSecretKey
-Werten.
Bemerkung
Diese Werte sollten auch verwendet werden, um Hadoop/Spark für den Zugriff auf S3 zu konfigurieren. Weitere Informationen dazu, einschließlich Beispiele, finden Sie unter Authentifizieren von Hadoop/Spark mit S3A oder S3N (unter diesem Thema).
Beispiel:
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>") // Then, configure your Snowflake environment // var sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<user_name>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>", "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"), "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"), "tempdir" -> "s3n://<temp-bucket-name>" )
Details zu den von sfOptions
unterstützten Optionen finden Sie unter AWS-Optionen für externe Datenübertragung (unter diesem Thema).
Authentifizieren von Hadoop/Spark mit S3A oder S3N¶
Hadoop/Spark-Ökosysteme unterstützen 2 URI-Schemas für den Zugriff auf S3:
s3a://
Neue, empfohlene Methode (für Hadoop 2.7 und höher)
Um diese Methode zu verwenden, ändern Sie die Scala-Beispiele unter diesem Thema, und fügen Sie dann die folgenden Hadoop-Konfigurationsoptionen hinzu:
val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3a.access.key", <accessKey>) hadoopConf.set("fs.s3a.secret.key", <secretKey>)
Stellen Sie sicher, dass die Option
tempdir
ebenfallss3a://
verwendet.s3n://
Ältere Methode (für Hadoop 2.6 und niedriger)
In einigen Systemen ist es notwendig, sie explizit anzugeben, wie im folgenden Scala-Beispiel gezeigt:
val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>) hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)
Verwenden von temporären AWS-Anmeldeinformationen¶
Bei dieser Methode werden die Konfigurationsoptionen temporary_aws_access_key_id
temporary_aws_secret_access_key
und temporary_aws_session_token
für den Konnektor verwendet.
Diese Methode bietet zusätzliche Sicherheit, da Snowflake nur temporären Zugriff auf den S3-Bucket bzw. das Verzeichnis für den Datenaustausch erhält.
Bemerkung
Temporäre Anmeldeinformationen können nur zur Konfiguration der S3-Authentifizierung für den Konnektor verwendet werden. Sie können nicht zur Konfiguration der Hadoop/Spark-Authentifizierung verwendet werden.
Wenn Sie temporäre Anmeldeinformationen angeben, haben diese Vorrang vor allen anderen permanenten Anmeldeinformationen.
Der folgende Scala-Code zeigt ein Beispiel für das Authentifizieren mit temporären Anmeldeinformationen:
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest import net.snowflake.spark.snowflake.Parameters // ... val sts_client = new AWSSecurityTokenServiceClient() val session_token_request = new GetSessionTokenRequest() // Set the token duration to 2 hours. session_token_request.setDurationSeconds(7200) val session_token_result = sts_client.getSessionToken(session_token_request) val session_creds = session_token_result.getCredentials() // Create a new set of Snowflake connector options, based on the existing // sfOptions definition, with additional temporary credential options that override // the credential options in sfOptions. // Note that constants from Parameters are used to guarantee correct // key names, but literal values, such as temporary_aws_access_key_id are, of course, // also allowed. var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId()) sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey()) sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())
sfOptions2
kann nun mit der options()
-DataFrame -Methode verwendet werden.
Authentifizieren von Azure für den Datenaustausch¶
In diesem Abschnitt wird beschrieben, wie Sie sich authentifizieren, wenn Sie den Azure Blob-Speicher für den Datenaustausch verwenden.
Die Authentifizierung auf diese Weise ist nur erforderlich, wenn einer der folgenden Umstände gilt:
Die Version des Snowflake-Konnektors für Spark ist 2.1.x (oder niedriger). Ab Version 2.2.0 verwendet der Konnektor für den Datenaustausch einen internen temporären Stagingbereich von Snowflake. Wenn Sie derzeit nicht die Version 2.2.0 (oder höher) des Konnektors verwenden, empfiehlt Snowflake dringend ein Upgrade auf die neueste Version.
Die Version des Snowflake-Konnektors für Spark ist 2.2.0 (oder höher), aber Ihre Jobs dauern regelmäßig länger als 36 Stunden. Dies ist die maximale Dauer für das Azure-Token, das vom Konnektor verwendet wird, um für den Datenaustausch auf den internen Stagingbereich zuzugreifen.
Sie müssen einen externen Azure Blob-Speichercontainer vorbereiten, über den der Konnektor Daten zwischen Snowflake und Spark austauschen kann.
Verwenden von Azure-Anmeldeinformationen¶
Dies ist die Standard-Authentifizierungsmethode für Azure Blob-Speicher. Sie erfordert ein Wertepaar aus tempDir
-Wert (eine URL) und temporary_azure_sas_token
-Wert.
Bemerkung
Diese Werte sollten auch verwendet werden, um Hadoop/Spark für den Zugriff auf Azure Blob-Speicher zu konfigurieren. Weitere Informationen dazu, einschließlich Beispiele, finden Sie unter Authentifizieren von Hadoop/Spark mit Azure (unter diesem Thema).
Beispiel:
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>) // Then, configure your Snowflake environment // val sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<user_name>", "sfPassword" -> "<password>", "sfDatabase" -> "<database_name>", "sfSchema" -> "<schema_name>", "sfWarehouse" -> "<warehouse_name>", "sfCompress" -> "on", "sfSSL" -> "on", "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/", "temporary_azure_sas_token" -> "<azure_sas>" )
Details zu den von sfOptions
unterstützten Optionen finden Sie unter Azure-Optionen für externe Datenübertragung (unter diesem Thema).
Authentifizieren von Hadoop/Spark mit Azure¶
Um diese Methode zu verwenden, ändern Sie die Scala-Beispiele unter diesem Thema, und fügen Sie dann die folgenden Hadoop-Konfigurationsoptionen hinzu:
val hadoopConf = sc.hadoopConfiguration sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Stellen Sie sicher, dass die Option tempdir
ebenfalls wasb://
verwendet.
Authentifizieren über einen Browser wird nicht unterstützt¶
Bei Verwendung des Spark-Konnektors ist es nicht praktisch, eine Form von Authentifizierung zu verwenden, bei der ein Browserfenster geöffnet wird, in dem der Benutzer zur Eingabe von Anmeldeinformationen aufgefordert wird. Dieses Fenster würde auf dem Clientcomputer nicht unbedingt angezeigt. Daher unterstützt der Spark-Konnektor keine Art von Authentifizierung, einschließlich MFA (Multi-Factor Authentication) oder SSO (Single Sign-On), bei der ein Fenster im Browser geöffnet werden würde.