Verwenden des Spark-Konnektors

Der Konnektor entspricht der Standard-API von Spark, bietet jedoch zusätzliche Snowflake-spezifische Optionen, die unter diesem Thema beschrieben werden.

COPY bezieht sich unter diesem Thema auf:

  • COPY INTO <Tabelle> (zur Übertragung von Daten aus einem internen oder externen Stagingbereich in eine Tabelle).

  • COPY INTO <Speicherort> (zur Übertragung von Daten aus einer Tabelle in einen internen oder externen Stagingbereich).

Unter diesem Thema:

Überprüfen der Netzwerkverbindung zu Snowflake mit SnowCD

Nach der Konfiguration des Treibers können Sie die Netzwerkkonnektivität zu Snowflake mit SnowCD testen und Probleme beheben.

Sie können während der Erstkonfiguration und bei Bedarf jederzeit SnowCD verwenden, um Ihre Netzwerkverbindung zu Snowflake zu testen und Probleme zu beheben.

Pushdown

Der Spark-Konnektor wendet Prädikat- und Abfrage-Pushdown an, indem die logischen Pläne von Spark für SQL-Operationen erfasst und analysiert werden. Wenn die Datenquelle Snowflake ist, werden die Operationen in eine SQL-Abfrage übersetzt und dann in Snowflake ausgeführt, um die Leistung zu verbessern.

Da für diese Übersetzung jedoch eine nahezu Eins-zu-Eins-Übersetzung der Spark-SQL-Operatoren in Snowflake-Ausdrücke erforderlich ist, können nicht alle Spark-SQL-Operatoren für Pushdown verwendet werden. Wenn Pushdown fehlschlägt, greift der Konnektor auf einen weniger optimierten Ausführungsplan zurück. Die nicht unterstützten Operationen werden stattdessen in Spark ausgeführt.

Bemerkung

Wenn Sie Pushdown für alle Operationen benötigen, sollten Sie Ihren Code so schreiben, dass dieser stattdessen Snowpark-API verwendet.

Nachfolgend finden Sie eine Liste der unterstützten Pushdown-Operationen (für alle unten aufgeführten Funktionen werden deren Spark-Namen verwendet). Wenn eine Funktion nicht in dieser Liste enthalten ist, wird möglicherweise ein Spark-Plan, der diese Funktion verwendet, auf Spark ausgeführt, anstatt den Pushdown zu Snowflake durchzuführen.

  • Aggregatfunktionen

    • Average

    • Corr

    • CovPopulation

    • CovSample

    • Count

    • Max

    • Min

    • StddevPop

    • StddevSamp

    • Sum

    • VariancePop

    • VarianceSamp

  • Boolesche Operatoren

    • And

    • Between

    • Contains

    • EndsWith

    • EqualTo

    • GreaterThan

    • GreaterThanOrEqual

    • In

    • IsNull

    • IsNotNull

    • LessThan

    • LessThanOrEqual

    • Not

    • Or

    • StartsWith

  • Funktionen für Datum, Uhrzeit und Zeitstempel

    • DateAdd

    • DateSub

    • Month

    • Quarter

    • TruncDate

    • TruncTimestamp

    • Year

  • Mathematische Funktionen

    • Arithmetische Operatoren „+“ (Addition), „-“ (Subtraktion), „*“ (Multiplikation), „/“ (Division) und „-“ (unäre Negation).

    • Abs

    • Acos

    • Asin

    • Atan

    • Ceil

    • CheckOverflow

    • Cos

    • Cosh

    • Exp

    • Floor

    • Greatest

    • Least

    • Log

    • Pi

    • Pow

    • PromotePrecision

    • Rand

    • Round

    • Sin

    • Sinh

    • Sqrt

    • Tan

    • Tanh

  • Verschiedene Operatoren

    • Alias (AS-Ausdrücke)

    • BitwiseAnd

    • BitwiseNot

    • BitwiseOr

    • BitwiseXor

    • CaseWhen

    • Cast(Unterelement, t, _)

    • Coalesce

    • If

    • MakeDecimal

    • ScalarSubquery

    • ShiftLeft

    • ShiftRight

    • SortOrder

    • UnscaledValue

  • Relationale Operatoren

    • Aggregatfunktionen und Group-By-Klauseln

    • Distinct

    • Filter

    • In

    • InSet

    • Joins

    • Einschränkungen

    • Projektionen

    • Sortierungen (ORDER BY)

    • Union und Union All

    • Fensterfunktionen und Fensterklauseln

  • Zeichenfolgenfunktionen

    • Ascii

    • Concat(Unterelemente)

    • Length

    • Like

    • Lower

    • StringLPad

    • StringRPad

    • StringTranslate

    • StringTrim

    • StringTrimLeft

    • StringTrimRight

    • Teilzeichenfolge

    • Upper

  • Fensterfunktionen (Hinweis: Diese funktionieren nicht mit Spark 2.2)

    • DenseRank

    • Rank

    • RowNumber

Verwenden des Konnektors in Scala

Angeben des Namens der Datenquellenklasse

Damit Snowflake als Datenquelle in Spark verwendet werden kann, geben Sie mit der Option .format den Namen der Snowflake-Konnektorklasse an, die die Datenquelle definiert.

net.snowflake.spark.snowflake

Um beim Kompilieren eine Prüfung des Klassennamens sicherzustellen, empfiehlt Snowflake dringend, eine Variable für den Klassennamen zu definieren. Beispiel:

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
Copy

Außerdem stellt die Klasse Utils die Variable zur Verfügung, die wie folgt importiert werden kann:

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
Copy

Bemerkung

Alle Beispiele unter diesem Thema verwenden SNOWFLAKE_SOURCE_NAME als Klassendefinition.

Aktivieren/Deaktivieren von Pushdown in einer Sitzung

Version 2.1.0 (und höher) des Konnektors unterstützt die Abfrageverschiebung (Pushdown), wodurch die Performance deutlich gesteigert wird. Dabei wird die Verarbeitung von Abfragen an Snowflake übergeben, wenn Snowflake die Spark-Datenquelle ist.

Standardmäßig ist Pushdown aktiviert.

Zur Deaktivierung des Pushdowns innerhalb einer Spark-Sitzung für einen bestimmten DataFrame:

  1. Nach der Instanziierung eines SparkSession-Objekts rufen Sie die statische Methode SnowflakeConnectorUtils.disablePushdownSession auf und übergeben das SparkSession Objekt. Beispiel:

    SnowflakeConnectorUtils.disablePushdownSession(spark)
    
    Copy

    Dabei ist spark Ihr SparkSession-Objekt.

  2. Erstellen Sie einen DataFrame mit der Option autopushdown, die auf off gesetzt ist. Beispiel:

    val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME)
      .options(sfOptions)
      .option("query", query)
      .option("autopushdown", "off")
      .load()
    
    Copy

    Beachten Sie, dass Sie die Option autopushdown auch in einer Map festlegen können, die Sie dann an die options-Methode übergeben (z. B. in sfOptions im obigen Beispiel).

Um Pushdown nach der Deaktivierung wieder zu aktivieren, rufen Sie die statische Methode SnowflakeConnectorUtils.enablePushdownSession auf (mit Übergabe des Objekts SparkSession) und erstellen ein DataFrame mit aktiviertem autopushdown.

Verschieben von Daten von Snowflake zu Spark

Bemerkung

Wenn Sie DataFrames verwenden, unterstützt der Snowflake-Konnektor nur SELECT-Abfragen.

So lesen Sie Daten von Snowflake in einen Spark-DataFrame ein:

  1. Verwenden Sie die read()-Methode des SqlContext-Objekts, um einen DataFrameReader zu erstellen.

  2. Geben Sie SNOWFLAKE_SOURCE_NAME mit der Methode format() an. Weitere Informationen zur Definition finden Sie unter Angeben des Namens der Datenquellenklasse (unter diesem Thema).

  3. Geben Sie die Konnektoroptionen mit der Methode option() oder options() an. Weitere Informationen dazu finden Sie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema).

  4. Geben Sie eine der folgenden Optionen für die zu lesenden Tabellendaten an:

    • dbtable: Der Name der zu lesenden Tabelle. Alle Spalten und Datensätze werden abgerufen (d. h. es ist gleichbedeutend mit SELECT * FROM db_table).

    • query: Die genaue Abfrage (SELECT-Anweisung), die ausgeführt werden soll.

Nutzungshinweise

  • Derzeit unterstützt der Konnektor bei Verwendung von DataFrames keine anderen Typen von Abfragen (wie SHOW, DESC oder DML-Anweisungen).

  • Es gibt eine Obergrenze für die Größe einer einzelnen Zeile. Weitere Details dazu finden Sie unter Begrenzung der Abfragetextgröße.

Hinweise zur Performance

Wenn Sie Daten zwischen Snowflake und Spark übertragen, verwenden Sie die folgenden Methoden, um die Performance zu analysieren bzw. zu verbessern:

  • Verwenden Sie die net.snowflake.spark.snowflake.Utils.getLastSelect()-Methode, um die aktuelle Abfrage anzuzeigen, die beim Verschieben von Daten von Snowflake nach Spark ausgegeben wird.

  • Wenn Sie die Funktionalität filter oder where des Spark-DataFrame verwenden, überprüfen Sie, ob in der ausgeführten SQL-Abfrage die entsprechenden Filter vorhanden sind. Der Snowflake-Konnektor versucht, alle von Spark angeforderten Filter in SQL zu übersetzen.

    Es gibt jedoch Formen von Filtern, die die Spark-Infrastruktur derzeit nicht an den Snowflake-Konnektor übergibt. Infolgedessen wird in einigen Situationen eine große Anzahl unnötiger Datensätze von Snowflake angefordert.

  • Wenn Sie nur eine Teilmenge von Spalten benötigen, stellen Sie sicher, dass die Teilmenge in der SQL-Abfrage angegeben ist.

  • Im Allgemeinen, wenn die ausgegebene SQL-Abfrage nicht mit dem übereinstimmt, was Sie aufgrund der DataFrame-Operationen erwarten, verwenden Sie die Option query, um genau die gewünschte SQL-Syntax bereitzustellen.

Beispiele

Lesen einer vollständigen Tabelle:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()
Copy

Lesen der Ergebnisse einer Abfrage:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()
Copy

Verschieben von Daten von Spark in Snowflake

Die Schritte zum Speichern des Inhalts eines DataFrame in eine Snowflake-Tabelle sind ähnlich dem Schreiben von Snowflake in Spark:

  1. Verwenden Sie die write()-Methode des DataFrame, um einen DataFrameWriter zu erstellen.

  2. Geben Sie SNOWFLAKE_SOURCE_NAME mit der Methode format() an. Weitere Informationen zur Definition finden Sie unter Angeben des Namens der Datenquellenklasse (unter diesem Thema).

  3. Geben Sie die Konnektoroptionen mit der Methode option() oder options() an. Weitere Informationen dazu finden Sie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema).

  4. Verwenden Sie die Option dbtable, um die Tabelle anzugeben, in die die Daten geschrieben werden sollen.

  5. Verwenden Sie die Methode mode(), um den Speichermodus für den Inhalt festzulegen.

    Weitere Informationen dazu finden Sie unter SaveMode (Spark-Dokumentation).

Beispiele

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Exportieren von JSON-Daten von Spark in Snowflake

Spark-DataFrames können JSON-Objekte enthalten, die als Zeichenfolgen serialisiert sind. Der folgende Code zeigt ein Beispiel für die Konvertierung eines regulären DataFrame in einen DataFrame, der JSON-Daten enthält:

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)
Copy

Beachten Sie, dass der resultierende jsonDataFrame eine einzelne Spalte vom Typ StringType ist. Wenn dieser DataFrame mit dem üblichen SaveMode.Overwrite-Modus in Snowflake exportiert wird, wird eine neue Tabelle in Snowflake mit einer einzelnen Spalte vom Typ VARCHAR erstellt.

So laden Sie einen jsonDataFrame in eine VARIANT-Spalte:

  1. Erstellen Sie eine Snowflake-Tabelle (Verbinden mit Snowflake in Java mithilfe des Snowflake-JDBC-Treibers). Erläuterungen zu den im Beispiel verwendeten Verbindungsparametern finden Sie unter Übersicht der Verbindungsparameter für den JDBC-Treiber.

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
        String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.com/";
    
        Properties properties = new Properties();
        properties.put("user", "peter");
        properties.put("password", "test");
        properties.put("account", "myorganization-myaccount");
        properties.put("warehouse", "mywh");
        properties.put("db", "mydb");
        properties.put("schema", "public");
    
        // get connection
        System.out.println("Create JDBC connection");
        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
        System.out.println("Done creating JDBC connection\n");
        // create statement
        System.out.println("Create JDBC statement");
        Statement statement = connection.createStatement();
        System.out.println("Done creating JDBC statement\n");
        // create a table
        System.out.println("Create my_variant_table table");
        statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
        statement.close();
        System.out.println("Done creating demo table\n");
    
        connection.close();
        System.out.println("Close connection\n");
      }
    }
    
    Copy
  2. Um die vorhandene Tabelle wiederzuverwenden, verwenden Sie SaveMode.Append anstelle von SaveMode.Overwrite. Wenn der Zeichenfolgenwert für JSON in Snowflake geladen wird, weil die Zielspalte vom Typ VARIANT ist, wird er als JSON analysiert. Beispiel:

    df.write
        .format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "my_variant_table")
        .mode(SaveMode.Append)
        .save()
    
    Copy

Ausführen von DDL/DML-SQL-Anweisungen

Verwenden Sie die runQuery()-Methode des Utils-Objekts, um zusätzlich zu Abfragen DDL/DML-SQL-Anweisungen auszuführen, z. B.:

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
    )
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")
Copy

wobei sfOptions die Parameterzuordnung ist, die zum Lesen/Schreiben von DataFrames verwendet wird.

Die Methode runQuery gibt nur TRUE oder FALSE zurück. Sie ist für Anweisungen gedacht, die kein Resultset zurückgeben, z. B. DDL-Anweisungen wie CREATE TABLE und DML-Anweisungen wie INSERT, UPDATE und DELETE. Das ist hilft nicht bei Anweisungen, die ein Resultset zurückgeben, z. B. SELECT oder SHOW.

Arbeiten mit Zeitstempeln und Zeitzonen

Spark bietet nur einen Typ von Zeitstempel, der dem Scala/Java-Zeitstempeltyp entspricht. Dieser ist im Verhalten nahezu identisch mit dem Datentyp TIMESTAMP_LTZ (lokale Zeitzone) von Snowflake. Daher empfiehlt Snowflake beim Übertragen von Daten zwischen Spark und Snowflake die folgenden Ansätze, um die Zeit im Verhältnis zu den Zeitzonen korrekt zu erhalten:

  • Verwenden Sie in Snowflake nur den Datentyp TIMESTAMP_LTZ.

    Bemerkung

    Der standardmäßig zugeordnete Datentyp für den Zeitstempel ist TIMESTAMP_NTZ (keine Zeitzone). Daher müssen Sie ausdrücklich den Parameter TIMESTAMP_TYPE_MAPPING festlegen, um TIMESTAMP_LTZ verwenden zu können.

  • Stellen Sie die Spark-Zeitzone auf UTC, und verwenden Sie diese Zeitzone in Snowflake (d. h. setzen Sie nicht die Option sfTimezone für den Konnektor, und setzen Sie nicht explizit eine Zeitzone in Snowflake). In diesem Szenario sind TIMESTAMP_LTZ und TIMESTAMP_NTZ praktisch gleichwertig.

    Zum Einstellen der Zeitzone fügen Sie Ihrem Spark-Code die folgende Zeile hinzu:

    java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
    
    Copy

Wenn Sie keinen dieser Ansätze implementieren, können unerwünschte Zeitänderungen auftreten. Betrachten Sie beispielsweise das folgende Szenario:

  • Die Zeitzone in Spark ist auf America/New_York eingestellt.

  • Die Zeitzone in Snowflake ist mit einer der beiden folgenden Methoden auf Europe/Warsaw eingestellt:

    • Setzen Sie sfTimezone für den Konnektor auf Europe/Warsaw.

    • Setzen Sie sfTimezone für den Konnektor auf snowflake und den Sitzungsparameter TIMEZONE in Snowflake auf Europe/Warsaw.

  • Sowohl TIMESTAMP_NTZ als auch TIMESTAMP_LTZ werden in Snowflake verwendet.

In diesem Szenario gilt:

  1. Wenn ein Wert, der 12:00:00 in einer TIMESTAMP_NTZ-Spalte in Snowflake repräsentiert, an Spark gesendet wird, enthält dieser Wert keine Zeitzoneninformationen. Spark behandelt den Wert als 12:00:00 in New York.

  2. Wenn Spark diesen Wert 12:00:00 (in New York) zurück an Snowflake sendet, um ihn in eine TIMESTAMP_LTZ-Spalte zu laden, wird er automatisch umgewandelt und als 18:00:00 (für die Zeitzone Warschau) geladen.

  3. Wenn dieser Wert dann in Snowflake in TIMESTAMP_NTZ umgewandelt wird, wird dem Benutzer 18:00:00 angezeigt, was sich vom ursprünglichen Wert 12:00:00 unterscheidet.

Zusammenfassend empfiehlt Snowflake, mindestens eine der folgenden Regeln strikt einzuhalten:

  • Verwenden Sie die gleiche Zeitzone, idealerweise UTC, sowohl für Spark als auch für Snowflake.

  • Verwenden Sie nur den Datentyp TIMESTAMP_LTZ für die Datenübertragung zwischen Spark und Snowflake.

Scala-Beispielprogramm

Wichtig

Dieses Beispielprogramm geht davon aus, dass Sie die Version 2.2.0 (oder höher) des Konnektors verwenden, der einen internen Snowflake-Stagingbereich zum Speichern temporärer Daten verwendet und daher keinen S3-Speicherort zum Speichern temporärer Daten benötigt. Wenn Sie eine frühere Version verwenden, müssen Sie über einen vorhandenen S3-Speicherort verfügen und Werte für tempdir, awsAccessKey, awsSecretKey für sfOptions angeben. Weitere Details dazu finden Sie unter AWS-Optionen für externe Datenübertragung (unter diesem Thema).

Das folgende Scala-Programm bietet einen vollständigen Anwendungsfall für den Snowflake-Konnektor für Spark. Ersetzen Sie vor der Verwendung des Codes die folgenden Zeichenfolgen durch die entsprechenden Werte, wie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema) beschrieben:

  • <Kontobezeichner>: Ihr Kontobezeichner.

  • <Benutzername>, <Kennwort>: Anmeldeinformationen für den Snowflake-Benutzer.

  • <Datenbank>, <Schema>, <Warehouse>: Standardwerte für die Snowflake-Sitzung.

Das Scala-Beispielprogramm verwendet die Basisauthentifizierung (d. h. Benutzername und Kennwort). Wenn Sie sich mit OAuth authentifizieren möchten, finden Sie die entsprechenden Informationen unter Verwenden von External OAuth (unter diesem Thema).

import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) from t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Verwenden des Konnektors mit Python

Die Verwendung des Konnektors mit Python ist der Verwendung von Scala sehr ähnlich.

Wir empfehlen Ihnen, das in der Spark-Distribution enthaltene bin/pyspark-Skript zu verwenden.

Konfigurieren des pyspark-Skripts

Das pyspark-Skript muss ähnlich wie das spark-shell-Skript konfiguriert werden, wobei die Optionen --packages oder --jars verwendet werden. Beispiel:

bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3
Copy

Vergessen Sie nicht, den Snowflake-Konnektor für Spark und die .jar-Dateien des JDBC-Konnektors in die CLASSPATH-Umgebungsvariable aufzunehmen.

Weitere Informationen zur Konfiguration des spark-shell-Skripts finden Sie unter Schritt 4: Lokales Spark-Cluster oder die von Amazon EMR gehostete Spark-Umgebung konfigurieren.

Aktivieren/Deaktivieren von Pushdown in einer Sitzung

Version 2.1.0 (und höher) des Konnektors unterstützt die Abfrageverschiebung (Pushdown), wodurch die Performance deutlich gesteigert wird. Dabei wird die Verarbeitung von Abfragen an Snowflake übergeben, wenn Snowflake die Spark-Datenquelle ist.

Standardmäßig ist Pushdown aktiviert.

Zur Deaktivierung des Pushdowns innerhalb einer Spark-Sitzung für einen bestimmten DataFrame:

  1. Nach der Instanziierung eines SparkSession-Objekts rufen Sie die statische Methode SnowflakeConnectorUtils.disablePushdownSession auf und übergeben das SparkSession Objekt. Beispiel:

    sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
    
    Copy
  2. Erstellen Sie einen DataFrame mit der Option autopushdown, die auf off gesetzt ist. Beispiel:

    df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sfOptions) \
      .option("query",  query) \
      .option("autopushdown", "off") \
      .load()
    
    Copy

    Beachten Sie, dass Sie die Option autopushdown auch in einem Dictionary festlegen können, das Sie dann an die options-Methode übergeben (z. B. in sfOptions im obigen Beispiel).

Um Pushdown nach der Deaktivierung wieder zu aktivieren, rufen Sie die statische Methode SnowflakeConnectorUtils.enablePushdownSession auf (mit Übergabe des Objekts SparkSession) und erstellen ein DataFrame mit aktiviertem autopushdown.

Beispiel für ein Python-Skript

Wichtig

Dieses Beispielskript geht davon aus, dass Sie die Version 2.2.0 (oder höher) des Konnektors verwenden, der einen internen Snowflake-Stagingbereich zum Speichern temporärer Daten verwendet und daher keinen S3-Speicherort zum Speichern dieser Daten benötigt. Wenn Sie eine frühere Version verwenden, müssen Sie über einen vorhandenen S3-Speicherort verfügen und Werte für tempdir, awsAccessKey, awsSecretKey für sfOptions angeben. Weitere Details dazu finden Sie unter AWS-Optionen für externe Datenübertragung (unter diesem Thema).

Nachdem das pyspark-Skript konfiguriert wurde, können Sie SQL-Abfragen und andere Operationen ausführen. Hier ist ein Beispiel für ein Python-Skript, das eine einfache SQL-Abfrage ausführt. Dieses Skript veranschaulicht die grundlegende Verwendung des Konnektors. Die meisten Scala-Beispiele in diesem Dokument können mit minimalem Aufwand für die Verwendung mit Python angepasst werden.

Das Python-Beispielskript verwendet die Basisauthentifizierung (d. h. Benutzername und Kennwort). Wenn Sie sich mit OAuth authentifizieren möchten, finden Sie die entsprechenden Informationen unter Verwenden von External OAuth (unter diesem Thema).

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()
Copy

Tipp

Beachten Sie die Verwendung von sfOptions und SNOWFLAKE_SOURCE_NAME. Dies vereinfacht den Code und reduziert die Wahrscheinlichkeit von Fehlern.

Details zu den unterstützten Optionen für sfOptions finden Sie unter Einstellen der Konfigurationsoptionen für den Konnektor (unter diesem Thema).

Zuordnen von Datentypen

Der Spark-Konnektor unterstützt die Konvertierung zwischen vielen gängigen Datentypen.

Von Spark SQL zu Snowflake

Spark-Datentyp

Snowflake-Datentyp

ArrayType

VARIANT

BinaryType

Nicht unterstützt

BooleanType

BOOLEAN

ByteType

INTEGER. Snowflake unterstützt den Typ BYTE nicht.

DateType

DATE

DecimalType

DECIMAL

DoubleType

DOUBLE

FloatType

FLOAT

IntegerType

INTEGER

LongType

INTEGER

MapType

VARIANT

ShortType

INTEGER

StringType

Wenn Länge angegeben ist, VARCHAR(N); andernfalls VARCHAR

StructType

VARIANT

TimestampType

TIMESTAMP

Von Snowflake zu Spark SQL

Snowflake-Datentyp

Spark-Datentyp

ARRAY

StringType

BIGINT

DecimalType(38, 0)

BINARY

Nicht unterstützt

BLOB

Nicht unterstützt

BOOLEAN

BooleanType

CHAR

StringType

CLOB

StringType

DATE

DateType

DECIMAL

DecimalType

DOUBLE

DoubleType

FLOAT

DoubleType

INTEGER

DecimalType(38, 0)

OBJECT

StringType

TIMESTAMP

TimestampType

TIME

StringType (Spark-Konnektor-Version 2.4.14 oder höher)

VARIANT

StringType

Aufrufen der Methode DataFrame.show

Wenn Sie die Methode DataFrame.show aufrufen und eine Zahl übergeben, die kleiner ist als die Anzahl der Zeilen im DataFrame, erstellen Sie ein DataFrame, das nur die Zeilen enthält, die in sortierter Reihenfolge angezeigt werden sollen.

Gehen Sie dabei wie folgt vor:

  1. Rufen Sie zuerst die Methode sort auf, um einen DataFrame zurückzugeben, der sortierte Zeilen enthält.

  2. Rufen Sie die Methode limit auf diesem DataFrame auf, um einen DataFrame zurückzugeben, der nur die Zeilen enthält, die Sie anzeigen möchten.

  3. Rufen Sie die Methode show auf dem zurückgegebenen DataFrame auf.

So können Sie beispielsweise 5 Zeilen anzeigen und die Ergebnisse nach der Spalte my_col sortieren:

val dfWithRowsToShow = originalDf.sort("my_col").limit(5)
dfWithRowsToShow.show(5)
Copy

Wenn Sie andernfalls show aufrufen, um eine Teilmenge von Zeilen in DataFrame anzuzeigen, könnten verschiedene Ausführungen des Codes dazu führen, dass unterschiedliche Zeilen angezeigt werden.

Einstellen der Konfigurationsoptionen für den Konnektor

In den folgenden Abschnitten werden die Optionen aufgeführt, mit denen Sie das Verhalten des Konnektors konfigurieren können:

Um diese Optionen festzulegen, rufen Sie die Methode .option(<Schlüssel>, <Wert>) oder die .options(<Zuordnung>)-Methode der Klasse Spark DataframeReader auf.

Tipp

Um die Verwendung der Optionen zu vereinfachen, empfiehlt Snowflake, die Optionen in einem einzigen Map-Objekt anzugeben und .options(<Zuordnung>) aufzurufen, um die Optionen zu festzulegen.

Erforderliche Verbindungsoptionen

Die folgenden Optionen sind für die Verbindung mit Snowflake erforderlich:

sfUrl

Gibt den Hostnamen für Ihr Konto im folgenden Format an:

account_identifier.snowflakecomputing.com

account_identifier ist ihr Kontobezeichner.

sfUser

Anmeldename für den Snowflake-Benutzer.

Außerdem müssen Sie zur Authentifizierung eine der folgenden Optionen verwenden:

  • sfPassword

    Kennwort für den Snowflake-Benutzer.

  • pem_private_key

    Privater Schlüssel (im PEM-Format) für die Schlüsselpaar-Authentifizierung. Eine Anleitung dazu finden Sie unter Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation.

  • sfAuthenticator

    Gibt an, dass zum Authentifizieren bei Snowflake External OAuth verwendet wird. Setzen Sie den Wert auf oauth.

    Für die Verwendung von External OAuth muss der Parameter sfToken festgelegt werden.

sfToken

(Erforderlich bei Verwendung von External OAuth) Setzen Sie den Wert auf Ihr External OAuth-Zugriffstoken.

Für diesen Verbindungsparameter muss der Parameterwert sfAuthenticator auf oauth gesetzt werden.

Der Standardwert ist „none“.

Erforderliche Kontextoptionen

Die folgenden Optionen sind erforderlich, um den Datenbank- und Schemakontext für die Sitzung festzulegen:

sfDatabase

Die Datenbank, die nach dem Verbinden für die Sitzung verwendet werden soll.

sfSchema

Das Schema, das nach dem Verbinden für die Sitzung verwendet werden soll.

Zusätzliche Kontextoptionen

Die in diesem Abschnitt aufgeführten Optionen sind nicht erforderlich.

sfAccount

Kontobezeichner (z. B. myorganization-myaccount). Diese Option ist nicht mehr erforderlich, da der Kontobezeichner in sfUrl angegeben ist. Sie ist hier nur aus Gründen der Abwärtskompatibilität dokumentiert.

sfWarehouse

Das standardmäßige virtuelle Warehouse, das nach dem Verbinden für die Sitzung verwendet werden soll.

sfRole

Die Standardsicherheitsrolle, die nach dem Verbinden für die Sitzung verwendet werden soll.

Proxy-Optionen

Die in diesem Abschnitt aufgeführten Optionen sind nicht erforderlich.

use_proxy

Gibt an, ob der Konnektor einen Proxy verwenden soll:

  • true gibt an, dass der Konnektor einen Proxy verwenden soll.

  • false gibt an, dass der Konnektor keinen Proxy verwenden soll.

Der Standardwert ist false.

proxy_host

(Erforderlich, wenn use_proxy den Wert true hat) Gibt den Hostnamen des zu verwendenden Proxyservers an.

proxy_port

(Erforderlich, wenn use_proxy den Wert true hat) Gibt die Portnummer des zu verwendenden Proxyservers an.

proxy_protocol

Gibt das Protokoll an, das für die Verbindung zum Proxyserver verwendet wird. Sie können einen der folgenden Werte angeben:

  • http

  • https

Der Standardwert ist http.

Dies wird nur für Snowflake auf AWS unterstützt.

Diese Option wurde in Version 2.11.1 des Spark-Konnektors hinzugefügt.

proxy_user

Gibt den Benutzernamen für die Authentifizierung beim Proxyserver an. Legen Sie dies fest, wenn der Proxyserver eine Authentifizierung erfordert.

Dies wird nur für Snowflake auf AWS unterstützt.

proxy_password

Gibt das Kennwort von proxy_user für die Authentifizierung beim Proxyserver an. Legen Sie dies fest, wenn der Proxyserver eine Authentifizierung erfordert.

Dies wird nur für Snowflake auf AWS unterstützt.

non_proxy_hosts

Gibt die Liste der Hosts an, mit denen sich der Konnektor unter Umgehung des Proxyservers direkt verbinden soll.

Verwenden Sie das URL-Escapezeichen %7C (Pipe-Symbol), um den Hostnamen zu separieren. Sie können auch ein Sternchen (*) als Platzhalter verwenden.

Dies wird nur für Snowflake auf AWS unterstützt.

Zusätzliche Optionen

Die in diesem Abschnitt aufgeführten Optionen sind nicht erforderlich.

sfTimezone

Die Zeitzone, die Snowflake bei der Verwendung von Spark verwenden soll. Beachten Sie, dass der Parameter nur die Zeitzone in Snowflake festlegt. Die Spark-Umgebung bleibt unverändert. Folgende Werte werden unterstützt:

  • spark: Verwendet die Zeitzone von Spark (Standard).

  • snowflake: Verwendet die aktuelle Zeitzone für Snowflake.

  • sf_default: Verwendet die Standardzeitzone des Snowflake-Benutzers, der sich verbindet.

  • time_zone: Verwenden Sie eine bestimmte Zeitzone (z. B. America/New_York), falls gültig.

    Weitere Informationen zu den Auswirkungen dieser Option finden Sie unter Arbeiten mit Zeitstempeln und Zeitzonen (unter diesem Thema).

sfCompress

Wenn auf on (Standard) eingestellt, werden die zwischen Snowflake und Spark übermittelten Daten komprimiert.

s3MaxFileSize

Die Größe der Datei, die beim Verschieben von Daten von Snowflake nach Spark verwendet wird. Die Voreinstellung ist 10 MB.

preactions

Eine durch Semikolon getrennte Liste von SQL-Befehlen, die ausgeführt werden, bevor Daten zwischen Spark und Snowflake übertragen werden.

Wenn ein SQL-Befehl %s enthält, wird %s durch den Tabellennamen ersetzt, auf den für die Operation verwiesen wird.

postactions

Eine durch Semikolon getrennte Liste von SQL-Befehlen, die ausgeführt werden, nachdem Daten zwischen Spark und Snowflake übertragen wurden.

Wenn ein SQL-Befehl %s enthält, wird dies durch den Tabellennamen ersetzt, auf den für die Operation verwiesen wird.

truncate_columns

Wenn auf on (Standard) gesetzt, werden beim COPY-Befehl Textzeichenfolgen automatisch abgeschnitten, die die Länge der Zielspalte überschreiten. Wenn auf off gesetzt, erstellt der Befehl beim Überschreiten der Länge der Zielspalte einen Fehler.

truncate_table

Dieser Parameter steuert, ob Snowflake das Schema einer Snowflake-Zieltabelle beim Überschreiben dieser Tabelle beibehalten wird.

Wenn eine Zieltabelle in Snowflake überschrieben wird, wird standardmäßig auch das Schema dieser Zieltabelle überschrieben. Das neue Schema basiert auf dem Schema der Quelltabelle (dem Spark-DataFrame).

Manchmal ist das Schema der Quelle jedoch nicht ideal. Ein Benutzer kann beispielsweise wünschen, dass eine Snowflake-Zieltabelle in Zukunft FLOAT-Werte speichern kann, obwohl der Datentyp der ursprünglichen Quellspalte INTEGER ist. In diesem Fall sollte das Schema der Snowflake-Tabelle nicht überschrieben werden. Die Snowflake-Tabelle sollte lediglich abgeschnitten und dann mit ihrem aktuellen Schema wiederverwendet werden.

Die möglichen Werte dieses Parameters sind:

  • on

  • off

Wenn dieser Parameter on ist, wird das ursprüngliche Schema der Zieltabelle beibehalten. Wenn dieser Parameter off ist, dann wird das alte Schema der Tabelle ignoriert und ein neues Schema basierend auf dem Schema der Quelle erzeugt.

Dieser Parameter ist optional.

Der Standardwert dieses Parameters ist off (d. h. standardmäßig wird das ursprüngliche Tabellenschema überschrieben).

Weitere Details zum Zuordnen von Spark-Datentypen zu Snowflake-Datentypen (und umgekehrt) finden Sie unter Zuordnen von Datentypen (unter diesem Thema).

continue_on_error

Diese Variable steuert, ob der COPY-Befehl abgebrochen wird, wenn der Benutzer ungültige Daten eingibt (z. B. ungültiges JSON-Format für eine Spalte vom Datentyp Variant).

Die möglichen Werte sind:

  • on

  • off

Der Wert on bedeutet, dass bei Auftreten eines Fehlers der Vorgang fortgesetzt wird. Der Wert off bedeutet, dass bei Auftreten eines Fehlers der Vorgang abgebrochen wird.

Dieser Parameter ist optional.

Der Standardwert dieses Parameters ist off.

Es wird nicht empfohlen, diese Option zu aktivieren. Wenn beim Kopieren (COPY) in Snowflake mit dem Spark-Konnektor ein Fehler gemeldet wurde, dann führt dies wahrscheinlich zu fehlenden Daten.

Bemerkung

Wenn Zeilen abgelehnt werden oder fehlen und diese Zeilen in der Eingabequelle nicht eindeutig fehlerhaft sind, wenden Sie sich an Snowflake.

usestagingtable

Dieser Parameter steuert, ob das Laden von Daten über eine Stagingtabelle erfolgt.

Eine Staging-Tabelle ist eine normale Tabelle (mit einem temporären Namen), die vom Konnektor erstellt wird. Wenn die Datenladeoperation erfolgreich ist, wird die ursprüngliche Zieltabelle gelöscht, und die Staging-Tabelle in den Namen der ursprünglichen Zieltabelle umbenannt. Wenn die Datenladeoperation fehlschlägt, wird die Staging-Tabelle gelöscht, und die Zieltabelle wird mit den Daten belassen, die sie unmittelbar vor der Operation hatte. Somit ermöglicht die Staging-Tabelle, dass die ursprünglichen Daten der Zieltabelle beibehalten werden, wenn die Operation fehlschlägt. Aus Sicherheitsgründen empfiehlt Snowflake in den meisten Fällen dringend die Verwendung einer Staging-Tabelle.

Damit der Konnektor eine Staging-Tabelle erstellen kann, muss der Benutzer, der die Kopieroperation (COPY) über den Spark-Konnektor ausführt, über ausreichende Berechtigungen zum Erstellen einer Tabelle verfügen. Das direkte Laden (d. h. Laden ohne Verwendung einer Staging-Tabelle) ist sinnvoll, wenn der Benutzer keine Berechtigung zum Erstellen einer Tabelle hat.

Die möglichen Werte dieses Parameters sind:

  • on

  • off

Wenn der Parameter on ist, wird eine Staging-Tabelle verwendet. Wenn dieser Parameter off ist, dann werden die Daten direkt in die Zieltabelle geladen.

Dieser Parameter ist optional.

Der Standardwert dieses Parameters ist on (d. h. Stagingtabelle wird verwendet).

autopushdown

Dieser Parameter steuert, ob das automatische Abfrage-Pushdown aktiviert ist.

Wenn Pushdown aktiviert ist, wird beim Ausführen einer Abfrage in Spark ein Teil der Abfrageverarbeitung auf den Snowflake-Server „verschoben“. Dies verbessert die Performance einiger Abfragen.

Dieser Parameter ist optional.

Der Standardwert ist on, wenn der Konnektor mit einer kompatiblen Version von Spark verwendet wird. Andernfalls ist der Standardwert off.

Wenn der Konnektor mit einer anderen Version von Spark verwendet wird, als für den Konnektor vorgesehen (z. B. wenn Version 3.2 des Konnektors mit Version 3.3 von Spark verwendet wird), dann ist der automatische Pushdown deaktiviert, auch wenn dieser Parameter auf on eingestellt ist.

purge

Wenn diese Einstellung auf on gesetzt ist, löscht der Konnektor temporäre Dateien, die beim Übertragen von Spark nach Snowflake per externer Datenübertragung erstellt wurden. Wenn dieser Parameter auf off gesetzt ist, werden diese Dateien nicht automatisch vom Konnektor gelöscht.

Das Bereinigen funktioniert nur bei Datenübertragungen von Spark nach Snowflake, nicht bei Datenübertragungen von Snowflake nach Spark.

Die möglichen Werte sind

  • on

  • off

Der Standardwert ist off.

columnmap

Dieser Parameter ist nützlich, wenn Sie Daten von Spark nach Snowflake schreiben und die Spaltennamen in der Snowflake-Tabelle nicht mit den Spaltennamen in der Spark-Tabelle übereinstimmen. Sie können eine Zuordnung erstellen, die angibt, welche Spark-Quellspalte zu welcher Snowflake-Zielspalte gehört.

Der Parameter ist ein einzelnes Zeichenfolgenliteral in Form von:

"Map(col_2 -> col_b, col_3 -> col_a)"

Betrachten Sie beispielsweise das folgende Szenario:

  • Angenommen, ein DataFrame namens df in Spark hat drei Spalten:

    col_1, col_2, col_3

  • Außerdem hat eine Tabelle mit dem Namen tb in Snowflake zwei Spalten:

    col_a, col_b

  • Sie möchten folgende Werte kopieren:

    • Von df.col_2 nach tb.col_b.

    • Von df.col_3 bis tb.col_a.

Der Wert des columnmap-Parameters wäre:

Map(col_2 -> col_b, col_3 -> col_a)

Sie können diesen Wert generieren, indem Sie den folgenden Scala-Code ausführen:

Map("col_2"->"col_b","col_3"->"col_a").toString()

Der Standardwert dieses Parameters ist null. Mit anderen Worten, standardmäßig sollten die Spaltennamen in der Quell- und Zieltabelle übereinstimmen.

Dieser Parameter wird nur beim Schreiben von Spark nach Snowflake verwendet. Er gilt nicht beim Schreiben von Snowflake nach Spark.

keep_column_case

Beim Schreiben einer Tabelle von Spark nach Snowflake wandelt der Spark-Konnektor die Buchstaben der Spaltennamen standardmäßig in Großbuchstaben um, sofern die Spaltennamen nicht in doppelte Anführungszeichen gesetzt sind.

Beim Schreiben einer Tabelle von Snowflake nach Spark setzt der Spark-Konnektor standardmäßig alle Spaltennamen in Anführungszeichen, die keine Großbuchstaben, Unterstriche und Ziffern enthalten.

Wenn Sie „keep_column_case“ auf on setzen, nimmt der Spark-Konnektor diese Änderungen nicht vor.

Die möglichen Werte sind:

  • on

  • off

Der Standardwert ist off.

column_mapping

Der Konnektor muss Spalten aus dem Spark-DataFrame der Snowflake-Tabelle zuordnen. Dies kann anhand von Spaltennamen (unabhängig von der Reihenfolge) oder anhand der Spaltenreihenfolge erfolgen (d. h. die erste Spalte im DataFrame wird unabhängig vom Spaltennamen der ersten Spalte in der Tabelle zugeordnet).

Standardmäßig erfolgt die Zuordnung anhand der Reihenfolge. Sie können diese Einstellung überschreiben, indem Sie den Parameter auf name setzen, wodurch der Konnektor angewiesen wird, Spalten anhand von Spaltennamen zuzuordnen. (Bei der Namenszuordnung wird Groß-/Kleinschreibung nicht berücksichtigt.)

Die möglichen Werte dieses Parameters sind:

  • order

  • name

Der Standardwert ist order.

column_mismatch_behavior

Dieser Parameter gilt nur, wenn der Parameter column_mapping auf name gesetzt ist.

Wenn die Spaltennamen im Spark-DataFrame und in der Snowflake-Tabelle nicht übereinstimmen, gilt Folgendes:

  • Wenn column_mismatch_behavior auf error gesetzt ist, meldet der Spark-Konnektor einen Fehler.

  • Wenn column_mismatch_behavior ignore ist, ignoriert der Spark-Konnektor den Fehler.

    • Der Treiber verwirft jede Spalte im Spark-DataFrame, die keine entsprechende Spalte in der Snowflake-Tabelle enthält.

    • Der Treiber fügt NULLs in jede Spalte der Snowflake-Tabelle ein, die keine entsprechende Spalte im Spark-DataFrame enthält.

Mögliche Fehler sind:

  • Der Spark-DataFrame kann Spalten enthalten, die bis auf die Groß-/Kleinschreibung identisch sind. Da bei der Zuordnung von Spaltennamen die Groß-/Kleinschreibung nicht berücksichtigt wird, kann die korrekte Zuordnung vom DataFrame zur Tabelle nicht ermittelt werden.

  • Die Snowflake-Tabelle kann Spalten enthalten, die bis auf die Groß-/Kleinschreibung identisch sind. Da bei der Zuordnung von Spaltennamen die Groß-/Kleinschreibung nicht berücksichtigt wird, kann die korrekte Zuordnung vom DataFrame zur Tabelle nicht ermittelt werden.

  • Der Spark-DataFrame und die Snowflake-Tabelle weisen möglicherweise keine gemeinsamen Spaltennamen auf. Theoretisch könnte der Spark-Konnektor NULLs in jede Spalte jeder Zeile einfügen, doch ist dies normalerweise sinnlos, sodass der Konnektor auch dann einen Fehler ausgibt, wenn column_mismatch_behavior auf ignore gesetzt ist.

Die möglichen Werte dieses Parameters sind:

  • error

  • ignore

Der Standardwert ist error.

time_output_format

Mit diesem Parameter kann der Benutzer das Format für die zurückgegebenen TIME-Daten angeben.

Die möglichen Werte dieses Parameters sind die möglichen Werte für Zeitformate, die unter Uhrzeitformate angegeben sind.

Dieser Parameter wirkt sich nur auf die Ausgabe aus, nicht auf die Eingabe.

timestamp_ntz_output_format, . timestamp_ltz_output_format, . timestamp_tz_output_format

Diese Optionen geben das Ausgabeformat für Zeitstempelwerte an. Die Standardwerte für diese Optionen sind:

Konfigurationsoption

Standardwert

timestamp_ntz_output_format

"YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_ltz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_tz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

Wenn diese Optionen auf "sf_current" gesetzt sind, verwendet der Konnektor die für die Sitzung angegebenen Formate.

partition_size_in_mb

Dieser Parameter wird verwendet, wenn das Resultset der Abfrage sehr groß ist und in mehrere DataFrame-Partitionen aufgeteilt werden muss. Dieser Parameter gibt die empfohlene unkomprimierte Größe für jede DataFrame-Partition an. Erhöhen Sie diesen Wert, um die Anzahl der Partitionen zu verringern.

Dieser Wert ist die empfohlene Größe. Die tatsächliche Größe der Partitionen kann kleiner oder größer sein.

Diese Option gilt nur, wenn der Parameter use_copy_unload den Wert FALSE hat.

Dieser Parameter ist optional.

Der Standardwert ist 100 (MB).

use_copy_unload

Wenn der Wert FALSE ist, verwendet Snowflake das Arrow-Datenformat für die Auswahl von Daten mit SELECT. Wenn der Wert TRUE ist, kehrt Snowflake zum alten Verhalten des Befehls COPY UNLOAD zurück, um ausgewählte Daten zu übertragen.

Dieser Parameter ist optional.

Der Standardwert ist FALSE.

treat_decimal_as_long

Bei TRUE wird der Spark-Konnektor so konfiguriert, dass er für Abfragen, die den Typ Decimal(precision, 0) zurückgeben, Long-Werte (und nicht BigDecimal-Werte) zurückgibt.

Der Standardwert ist FALSE.

Diese Option wurde in Version 2.11.1 des Spark-Konnektors hinzugefügt.

s3_stage_vpce_dns_name

Gibt den DNS-Namen Ihres VPC-Endpunkts für den Zugriff auf interne Stagingbereiche an.

Diese Option wurde in Version 2.11.1 des Spark-Konnektors hinzugefügt.

support_share_connection

Bei FALSE wird der Spark-Konnektor so konfiguriert, dass er für jeden Job und jede Aktion, bei dem/der für den Zugriff auf Snowflake dieselben Spark-Konnektor-Optionen verwendet werden, eine neue JDBC-Verbindung erstellt.

Der Standardwert ist TRUE, was bedeutet, dass die verschiedenen Jobs und Aktionen die gleiche JDBC-Verbindung nutzen, wenn sie die gleichen Spark-Konnektor-Optionen für den Zugriff auf Snowflake verwenden.

Wenn Sie diese Einstellung programmgesteuert aktivieren oder deaktivieren müssen, verwenden Sie die folgenden globalen statischen Funktionen:

  • SparkConnectorContext.disableSharedConnection()

  • SparkConnectorContext.enableSharingJDBCConnection()

Bemerkung

In den folgenden Sonderfällen verwendet der Spark-Konnektor keine gemeinsame JDBC-Verbindung:

  • Wenn vorherige oder nachfolgende Aktionen festgelegt sind und es sich bei denen nicht um CREATE TABLE, DROP TABLE oder MERGE INTO handelt, verwendet der Spark-Konnektor nicht die gemeinsam genutzte Verbindung.

  • Hilfsprogrammfunktionen in Utils wie Utils.runQuery() und Utils.getJDBCConnection() verwenden nicht die gemeinsam genutzte Verbindung nicht.

Diese Option wurde in Version 2.11.2 des Spark-Konnektors hinzugefügt.

force_skip_pre_post_action_check_for_shared_session

Wenn TRUE, wird der Spark-Konnektor so konfiguriert, dass die Validierung von vorherigen und nachfolgenden Aktionen für die gemeinsame Nutzung von Sitzungen deaktiviert wird.

Der Standardwert ist FALSE.

Wichtig

Stellen Sie sich vor dem Einstellen dieser Option sicher, dass die Abfragen in den vorherigen und den nachfolgenden Aktionen keine Auswirkungen auf die Sitzungseinstellungen haben. Andernfalls könnten Probleme bei den Ergebnissen auftreten.

Diese Option wurde in Version 2.11.3 des Spark-Konnektors hinzugefügt.

Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation

Der Spark-Konnektor unterstützt Schlüsselpaar-Authentifizierung und Schlüsselrotation.

  1. Im ersten Schritt führen Sie die Erstkonfiguration der Schlüsselpaar-Authentifizierung durch, wie unter Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation gezeigt.

  2. Senden Sie eine unverschlüsselte Kopie des privaten Schlüssels mithilfe der Verbindungsoption pem_private_key.

Achtung

Aus Sicherheitsgründen sollten Sie diesen pem_private_key-Schlüssel in Ihrer Anwendung nicht fest kodieren, sondern den Parameter dynamisch festlegen, nachdem Sie den Schlüssel aus einer sicheren Quelle gelesen haben. Wenn der Schlüssel verschlüsselt ist, entschlüsseln Sie ihn, und senden Sie die entschlüsselte Version.

Beachten Sie im Python-Beispiel, dass die pem_private_key-Datei rsa_key.p8:

  • direkt aus einer kennwortgeschützten Datei mit der Umgebungsvariablen PRIVATE_KEY_PASSPHRASE gelesen wird und

  • den Ausdruck pkb in der sfOptions-Zeichenfolge verwendet.

Um eine Verbindung herzustellen, können Sie das Python-Beispiel in einer Datei speichern (d. h. <file.py>) und dann den folgenden Befehl ausführen:

spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>
Copy

Python

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os

with open("<path>/rsa_key.p8", "rb") as key_file:
  p_key = serialization.load_pem_private_key(
    key_file.read(),
    password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
    backend = default_backend()
    )

pkb = p_key.private_bytes(
  encoding = serialization.Encoding.PEM,
  format = serialization.PrivateFormat.PKCS8,
  encryption_algorithm = serialization.NoEncryption()
  )

pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')

sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "pem_private_key" : pkb,
  "sfDatabase" : "<database>",
  "sfSchema" : "schema",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "COLORS") \
    .load()

df.show()
Copy

Verwenden von External OAuth

Ab Spark-Konnektor, Version 2.7.0 können Sie External OAuth zum Authentifizieren bei Snowflake mittels Scala-Beispielprogramm oder Python-Beispielskript verwenden.

Bevor Sie External OAuth und den Spark-Konnektor zur Authentifizierung bei Snowflake verwenden, konfigurieren Sie eine External OAuth-Sicherheitsintegration für einen der unterstützten External OAuth-Autorisierungsserver oder einen kundenspezifischen Client für External OAuth.

Beachten Sie in den Scala- und Python-Beispielen, dass der Parameter sfPassword durch die Parameter sfAuthenticator und sfToken ersetzt wird.

Scala:

// spark connector version

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.DataFrame

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<username>",
    "sfAuthenticator" -> "oauth",
    "sfToken" -> "<external_oauth_access_token>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "region")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Python:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfAuthenticator" : "oauth",
  "sfToken" : "<external_oauth_access_token>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()
Copy

AWS-Optionen für externe Datenübertragung

Diese Optionen werden verwendet, um den Amazon S3-Speicherort für die Speicherung temporärer Daten anzugeben und um Authentifizierungsdetails für den Zugriff auf den Speicherort bereitzustellen. Sie sind nur erforderlich, wenn Sie eine externe Datenübertragung durchführen. Externe Datenübertragungen sind erforderlich, wenn eine der folgenden Voraussetzungen erfüllt ist:

  • Sie verwenden die Version 2.1.x oder niedriger des Spark-Konnektors (der keine interne Übertragung unterstützt) oder

  • Ihre Übertragung dauert wahrscheinlich 36 Stunden oder länger (interne Übertragung verwendet temporäre Anmeldeinformationen, die nach 36 Stunden ablaufen).

tempDir

Der S3-Speicherort, an dem Daten zwischengespeichert werden (z. B. s3n://xy12345-bucket/spark-snowflake-tmp/).

Wenn tempDir angegeben ist, müssen Sie auch eine der beiden Optionen angeben:

  • awsAccessKey, awsSecretKey . oder

  • temporary_aws_access_key_id, temporary_aws_secret_access_key, temporary_aws_session_token

awsAccessKey, awsSecretKey

Dies sind standardmäßige AWS-Anmeldeinformationen, die den Zugriff auf den in tempDir angegebenen Speicherort erlauben. Beachten Sie, dass diese beiden Optionen zusammen festgelegt werden müssen.

Wenn beide festgelegt sind, können sie aus dem vorhandenen SparkContext-Objekt abgerufen werden.

Wenn Sie diese Variablen angeben, müssen Sie auch tempDir angeben.

Diese Anmeldeinformationen sollten auch für den Hadoop-Cluster festgelegt werden.

temporary_aws_access_key_id, temporary_aws_secret_access_key, temporary_aws_session_token

Dies sind temporäre AWS-Anmeldeinformationen, die den Zugriff auf den in tempDir angegebenen Speicherort ermöglichen. Beachten Sie, dass alle drei Optionen zusammen festgelegt werden müssen.

Wenn diese Optionen festgelegt sind, haben sie Vorrang vor den Optionen awsAccessKey und awsSecretKey.

Wenn Sie temporary_aws_access_key_id, temporary_aws_secret_access_key und temporary_aws_session_token angeben, müssen Sie auch tempDir angeben. Andernfalls werden diese Parameter ignoriert.

check_bucket_configuration

Wenn auf on (Standard) gesetzt, prüft der Konnektor, ob für den Bucket, der zur Datenübertragung verwendet wird, eine Lebenszyklusrichtlinie konfiguriert ist (siehe Vorbereiten eines externen AWS S3-Buckets für weitere Informationen). Wenn keine Lebenszyklusrichtlinie vorhanden ist, wird eine Warnung protokolliert.

Wenn Sie diese Option deaktivieren (durch Setzen auf off), wird diese Prüfung übersprungen. Dies kann hilfreich sein, wenn ein Benutzer auf die Bucketdatenoperationen zugreifen darf, nicht aber auf die Lebenszyklusrichtlinien des Buckets. Das Deaktivieren der Option kann auch die Ausführungszeiten von Abfragen etwas beschleunigen.

Weitere Details dazu finden Sie unter Authentifizierung von S3 für den Datenaustausch (unter diesem Thema).

Azure-Optionen für externe Datenübertragung

In diesem Abschnitt werden die Parameter beschrieben, die für Azure Blob-Speicher bei externen Datenübertragungen gelten. Externe Datenübertragungen sind erforderlich, wenn eine der folgenden Voraussetzungen erfüllt ist:

  • Sie verwenden die Version 2.1.x oder niedriger des Spark-Konnektors (der keine interne Übertragung unterstützt) oder

  • Ihre Übertragung dauert wahrscheinlich 36 Stunden oder länger (interne Übertragung verwendet temporäre Anmeldeinformationen, die nach 36 Stunden ablaufen).

Wenn Sie eine externe Übertragung mit Azure Blob-Speicher verwenden, geben Sie den Speicherort des Azure-Containers und die SAS (Freigabesignatur) für diesen Container mit den unten beschriebenen Parametern an.

tempDir

Der Azure Blob-Speichercontainer, in dem Daten zwischengespeichert werden. Dies hat die Form einer URL, zum Beispiel:

wasb://<Azure-Container>@<Azure-Konto>.<Azure-Endpunkt>/

temporary_azure_sas_token

Geben Sie das SAS-Token für den Azure Blob-Speicher an.

Weitere Details dazu finden Sie unter Authentifizieren von Azure für den Datenaustausch (unter diesem Thema).

Angeben von Azure-Informationen für temporären Speicher in Spark

Wenn Sie mit Azure Blob Storage einen temporären Speicher zum Übertragen von Daten zwischen Spark und Snowflake bereitstellen, müssen Sie neben dem Snowflake-Konnektor für Spark den Speicherort und die Anmeldeinformationen für den temporären Speicher angeben.

Um Spark die Angaben zum temporären Speicherort bereitzustellen, führen Sie in Ihrem Spark-Cluster beispielsweise folgende Befehle aus:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Copy

Beachten Sie, dass der letzte Befehl die folgenden Variablen enthält:

  • <Container> und <Konto>: Dabei handelt es sich um den Container- und Kontoname für Ihre Azure-Bereitstellung.

  • <Azure-Endpunkt>: Dies ist der Endpunkt Ihres Azure-Bereitstellungsorts. Wenn Sie beispielsweise eine Azure US-Bereitstellung verwenden, ist der Endpunkt wahrscheinlich blob.core.windows.net.

  • <Azure-SAS>: Dies ist das Shared Access Signature-Sicherheitstoken.

Ersetzen Sie jede dieser Variablen durch die passenden Informationen für Ihr Azure Blob Storage-Konto.

Übergeben von Snowflake-Sitzungsparametern als Optionen für den Konnektor

Der Snowflake-Konnektor für Spark unterstützt das Senden beliebiger Parameter auf Sitzungsebene an Snowflake (siehe Sitzungsparameter für weitere Informationen). Dies kann durch Hinzufügen eines ("<Schlüssel>" -> "<Wert>")-Paares zum options-Objekt erreicht werden, wobei <Schlüssel> der Sitzungsparametername und <Wert> der Wert ist.

Bemerkung

Der <Wert> sollte eine Zeichenfolge sein, die in doppelte Anführungszeichen eingeschlossen ist, auch für Parameter, die Zahlen oder boolesche Werte akzeptieren (z. B. "1" oder "true").

Im folgenden Codebeispiel wird der Sitzungsparameter USE_CACHED_RESULT mit dem Wert "false" übergeben, wodurch die Verwendung der Ergebnisse zuvor ausgeführter Abfragen deaktiviert wird:

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method
Copy

Sicherheitshinweise

Kunden sollten sicherstellen, dass in einem Spark-System mit mehreren Knoten die Kommunikation zwischen den Knoten sicher ist. Der Spark-Master sendet Snowflake-Anmeldeinformationen an Spark-Worker, damit diese auf Snowflake-Stagingbereiche zugreifen können. Wenn die Kommunikation zwischen dem Spark-Master und den Spark-Worker nicht sicher ist, können die Anmeldeinformationen von einem nicht autorisierten Dritten gelesen werden.

Authentifizierung von S3 für den Datenaustausch

In diesem Abschnitt wird die Authentifizierung bei der Verwendung von S3 für den Datenaustausch beschrieben.

Die Ausführung dieser Aufgabe ist nur in einer der folgenden Situationen erforderlich:

  • Die Version des Snowflake-Konnektors für Spark ist 2.1.x (oder niedriger). Ab Version 2.2.0 verwendet der Konnektor für den Datenaustausch einen internen temporären Stagingbereich von Snowflake. Wenn Sie derzeit nicht die Version 2.2.0 (oder höher) des Konnektors verwenden, empfiehlt Snowflake dringend ein Upgrade auf die neueste Version.

  • Die Version des Snowflake-Konnektors für Spark ist 2.2.0 (oder höher), aber Ihre Jobs dauern regelmäßig länger als 36 Stunden. Dies ist die maximale Dauer für das AWS-Token, das der Konnektor verwendet, um auf den internen Stagingbereich für den Datenaustausch zuzugreifen.

Wenn Sie eine ältere Version des Konnektors verwenden, müssen Sie einen S3-Speicherort vorbereiten, den der Konnektor zum Datenaustausch zwischen Snowflake und Spark verwenden kann.

Um den Zugriff auf den S3-Bucket/das Verzeichnis für den Datenaustausch zwischen Spark und Snowflake zu ermöglichen (wie für tempDir angegeben), werden zwei Authentifizierungsmethoden unterstützt:

  • Permanente AWS-Anmeldeinformationen (auch zur Konfiguration der Hadoop/Spark-Authentifizierung für den Zugriff auf S3 verwendet)

  • Temporäre AWS-Anmeldeinformationen

Verwenden von permanenten AWS-Anmeldeinformationen

Dies ist die Standard-Authentifizierungsmethode für AWS. Sie erfordert ein Paar aus awsAccessKey- und awsSecretKey-Werten.

Bemerkung

Diese Werte sollten auch verwendet werden, um Hadoop/Spark für den Zugriff auf S3 zu konfigurieren. Weitere Informationen dazu, einschließlich Beispiele, finden Sie unter Authentifizieren von Hadoop/Spark mit S3A oder S3N (unter diesem Thema).

Beispiel:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)
Copy

Details zu den von sfOptions unterstützten Optionen finden Sie unter AWS-Optionen für externe Datenübertragung (unter diesem Thema).

Authentifizieren von Hadoop/Spark mit S3A oder S3N

Hadoop/Spark-Ökosysteme unterstützen 2 URI-Schemas für den Zugriff auf S3:

s3a://

Neue, empfohlene Methode (für Hadoop 2.7 und höher)

Um diese Methode zu verwenden, ändern Sie die Scala-Beispiele unter diesem Thema, und fügen Sie dann die folgenden Hadoop-Konfigurationsoptionen hinzu:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)
Copy

Stellen Sie sicher, dass die Option tempdir ebenfalls s3a:// verwendet.

s3n://

Ältere Methode (für Hadoop 2.6 und niedriger)

In einigen Systemen ist es notwendig, sie explizit anzugeben, wie im folgenden Scala-Beispiel gezeigt:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)
Copy

Verwenden von temporären AWS-Anmeldeinformationen

Bei dieser Methode werden die Konfigurationsoptionen temporary_aws_access_key_id temporary_aws_secret_access_key und temporary_aws_session_token für den Konnektor verwendet.

Diese Methode bietet zusätzliche Sicherheit, da Snowflake nur temporären Zugriff auf den S3-Bucket bzw. das Verzeichnis für den Datenaustausch erhält.

Bemerkung

Temporäre Anmeldeinformationen können nur zur Konfiguration der S3-Authentifizierung für den Konnektor verwendet werden. Sie können nicht zur Konfiguration der Hadoop/Spark-Authentifizierung verwendet werden.

Wenn Sie temporäre Anmeldeinformationen angeben, haben diese Vorrang vor allen anderen permanenten Anmeldeinformationen.

Der folgende Scala-Code zeigt ein Beispiel für das Authentifizieren mit temporären Anmeldeinformationen:

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())
Copy

sfOptions2 kann nun mit der options()-DataFrame -Methode verwendet werden.

Authentifizieren von Azure für den Datenaustausch

In diesem Abschnitt wird beschrieben, wie Sie sich authentifizieren, wenn Sie den Azure Blob-Speicher für den Datenaustausch verwenden.

Die Authentifizierung auf diese Weise ist nur erforderlich, wenn einer der folgenden Umstände gilt:

  • Die Version des Snowflake-Konnektors für Spark ist 2.1.x (oder niedriger). Ab Version 2.2.0 verwendet der Konnektor für den Datenaustausch einen internen temporären Stagingbereich von Snowflake. Wenn Sie derzeit nicht die Version 2.2.0 (oder höher) des Konnektors verwenden, empfiehlt Snowflake dringend ein Upgrade auf die neueste Version.

  • Die Version des Snowflake-Konnektors für Spark ist 2.2.0 (oder höher), aber Ihre Jobs dauern regelmäßig länger als 36 Stunden. Dies ist die maximale Dauer für das Azure-Token, das vom Konnektor verwendet wird, um für den Datenaustausch auf den internen Stagingbereich zuzugreifen.

Sie müssen einen externen Azure Blob-Speichercontainer vorbereiten, über den der Konnektor Daten zwischen Snowflake und Spark austauschen kann.

Verwenden von Azure-Anmeldeinformationen

Dies ist die Standard-Authentifizierungsmethode für Azure Blob-Speicher. Sie erfordert ein Wertepaar aus tempDir-Wert (eine URL) und temporary_azure_sas_token-Wert.

Bemerkung

Diese Werte sollten auch verwendet werden, um Hadoop/Spark für den Zugriff auf Azure Blob-Speicher zu konfigurieren. Weitere Informationen dazu, einschließlich Beispiele, finden Sie unter Authentifizieren von Hadoop/Spark mit Azure (unter diesem Thema).

Beispiel:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

// Then, configure your Snowflake environment
//
val sfOptions = Map(
  "sfURL" -> "<account_identifier>.snowflakecomputing.com",
  "sfUser" -> "<user_name>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database_name>",
  "sfSchema" -> "<schema_name>",
  "sfWarehouse" -> "<warehouse_name>",
  "sfCompress" -> "on",
  "sfSSL" -> "on",
  "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/",
  "temporary_azure_sas_token" -> "<azure_sas>"
)
Copy

Details zu den von sfOptions unterstützten Optionen finden Sie unter Azure-Optionen für externe Datenübertragung (unter diesem Thema).

Authentifizieren von Hadoop/Spark mit Azure

Um diese Methode zu verwenden, ändern Sie die Scala-Beispiele unter diesem Thema, und fügen Sie dann die folgenden Hadoop-Konfigurationsoptionen hinzu:

val hadoopConf = sc.hadoopConfiguration
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Copy

Stellen Sie sicher, dass die Option tempdir ebenfalls wasb:// verwendet.

Authentifizieren über einen Browser wird nicht unterstützt

Bei Verwendung des Spark-Konnektors ist es nicht praktisch, eine Form von Authentifizierung zu verwenden, bei der ein Browserfenster geöffnet wird, in dem der Benutzer zur Eingabe von Anmeldeinformationen aufgefordert wird. Dieses Fenster würde auf dem Clientcomputer nicht unbedingt angezeigt. Daher unterstützt der Spark-Konnektor keine Art von Authentifizierung, einschließlich MFA (Multi-Factor Authentication) oder SSO (Single Sign-On), bei der ein Fenster im Browser geöffnet werden würde.