Sparkコネクタの使用

コネクタは標準のSpark APIに準拠していますが、このトピックで説明するSnowflake固有のオプションが追加されています。

このトピックでは、用語 COPY は次の両方を指します:

  • COPY INTO <テーブル> (内部または外部のステージからテーブルにデータを転送するために使用されます)。

  • COPY INTO <場所> (テーブルから内部または外部ステージにデータを転送するために使用されます)。

このトピックの内容:

SnowCD を使用したSnowflakeへのネットワーク接続の検証

ドライバーを設定したら、 SnowCD を使用して、Snowflakeへのネットワーク接続を評価およびトラブルシューティングできます。

初期設定プロセス中にオンデマンドで SnowCD をいつでも使用して、Snowflakeへのネットワーク接続を評価およびトラブルシューティングできます。

プッシュダウン

Sparkコネクターは、 SQL 操作のSpark論理プランをキャプチャおよび分析することにより、述語とクエリプッシュダウンを適用します。データソースがSnowflakeの場合、操作は SQL クエリに変換され、Snowflakeで実行されてパフォーマンスが向上します。

ただし、この変換にはSpark SQL 演算子からSnowflake式へのほぼ1対1の変換が必要であるため、すべてのSpark SQL 演算子をプッシュダウンできるわけではありません。プッシュダウンが失敗すると、コネクタは最適化されていない実行プランにフォールバックします。サポートされていない操作は、代わりにSparkで実行されます。

以下は、プッシュダウンでサポートされている操作のリストです(以下のすべての関数は、Spark名を使用しています)。関数がこのリストにない場合、Snowflakeにプッシュダウンされるのではなく、それを利用するSparkプランがSparkで実行される場合があります。

  • 集計関数

    • Average

    • CorrCovPopulation

    • CovSample

    • Count

    • Max

    • Min

    • StddevPop

    • StddevSamp

    • Sum

    • VariancePop

    • VarianceSamp

  • ブール演算子

    • And

    • Between

    • Contains

    • EndsWith

    • EqualTo

    • GreaterThan

    • GreaterThanOrEqual

    • In

    • IsNull

    • IsNotNull

    • LessThan

    • LessThanOrEqual

    • Not

    • Or

    • StartsWith

  • 数学関数

    • 算術演算子「+」(加算)、「-」(減算)、「*」(乗算)、「/」(除算)、および「-」(単項否定)。

    • Abs

    • Acos

    • Asin

    • Atan

    • Ceil

    • Cos

    • Cosh

    • Exp

    • Floor

    • Greatest

    • Least

    • Log

    • Pi

    • Pow

    • Rand

    • Round

    • Sin

    • Sinh

    • Sqrt

    • Tan

    • Tanh

  • その他の演算子

    • Alias(AS 式)

    • BitwiseAnd

    • BitwiseNot

    • BitwiseOr

    • BitwiseXor

    • Cast(子、t、_)

    • DateAdd

    • If

    • MakeDecimal

    • ScalarSubquery

    • ShiftLeft

    • ShiftRight

    • SortOrder

    • UnscaledValue

  • 関係演算子

    • 集計関数とgroup-by句

    • Distinct

    • Filters

    • In

    • InSet

    • Joins

    • Limits

    • Projections

    • Sorts(ORDER BY)

    • UnionおよびUnion All

    • ウィンドウ関数とwindowing句

  • 文字列関数

    • Ascii

    • Concat(子)

    • Like

    • Lower

    • StringLPad

    • StringRPad

    • StringTranslate

    • StringTrim

    • StringTrimLeft

    • StringTrimRight

    • Substring

    • Upper

  • ウィンドウ関数(注:これらはSpark 2.2では機能しません)

    • RowNumber

Scalaでコネクタを使用する

データソースクラス名の指定

SnowflakeをSparkのデータソースとして使用するには、 .format オプションを使用して、データソースを定義するSnowflakeコネクタクラス名を指定します。

net.snowflake.spark.snowflake

クラス名のコンパイル時チェックを確実にするために、Snowflakeはクラス名に変数を定義することを強くお勧めします。例:

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

また、便宜上 Utils クラスは変数を提供し、次のようにインポートできます。

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME

注釈

このトピックのすべての例では、クラス定義として SNOWFLAKE_SOURCE_NAME を使用しています。

セッションでのプッシュダウンの有効化/無効化

コネクタのバージョン2.1.0(以降)はクエリプッシュダウンをサポートします。これは、SnowflakeがSparkデータソースである場合にクエリ処理をSnowflakeにプッシュすることにより、パフォーマンスを大幅に改善できます。

デフォルトでは、プッシュダウンは有効になっています。

Sparkセッション内で無効にするには、 SparkSession オブジェクトをインスタンス化した後、次の静的メソッド呼び出しを呼び出します。

SnowflakeConnectorUtils.disablePushdownSession(spark)

sparkSparkSession オブジェクトです。

次のメソッドを呼び出すことにより、いつでもプッシュダウンを再度有効にできます。

SnowflakeConnectorUtils.enablePushdownSession(spark)

SnowflakeからSparkへのデータの移動

注釈

DataFramesを使用する場合、Snowflakeコネクタは SELECT クエリのみをサポートします。

SnowflakeからSpark DataFrame にデータを読み込むには、

  1. read() メソッド( SqlContext オブジェクトの)を使用して DataFrameReader を構成します。

  2. format() メソッドを使用して、 SNOWFLAKE_SOURCE_NAME を指定します。定義については、 データソースクラス名の指定 (このトピック)をご参照ください。

  3. option() または options() メソッドを使用して、コネクタオプションを指定します。詳細については、 コネクタの構成オプションの設定 (このトピック)をご参照ください。

  4. 読み取るテーブルデータに次のオプションのいずれかを指定します。

    • dbtable :読み取るテーブルの名前。すべての列とレコードが取得されます(つまり、 SELECT * FROM データベーステーブル と同等)。

    • query :実行する正確なクエリ(SELECT ステートメント)。

使用上の注意

  • 現在、 DataFrames を使用するときに、コネクタは他の種類のクエリ(例: SHOW 、 DESC、または DML ステートメント)をサポートしません。

  • 個々の行のサイズには上限があります。詳細については、 クエリテキストサイズの制限 をご参照ください。

パフォーマンスの考慮事項

SnowflakeとSparkの間でデータを転送する場合、次の方法を使用してパフォーマンスを分析/改善します。

  • net.snowflake.spark.snowflake.Utils.getLastSelect() メソッドを使用して、SnowflakeからSparkにデータを移動するときに発行される実際のクエリを確認します。

  • Spark DataFrameの filter または where 機能を使用する場合、発行された SQL クエリにそれぞれのフィルターが存在することを確認してください。Snowflakeコネクタは、Sparkによって要求されたすべてのフィルターを SQLに変換しようとします。

    ただし、現在のSparkインフラストラクチャがSnowflakeコネクタに渡さないフィルターの形式があります。その結果、状況によっては、Snowflakeから多数の不要なレコードが要求されます。

  • 列のサブセットのみが必要な場合は、 SQL クエリでサブセットが反映されていることを確認してください。

  • 一般に、発行された SQL クエリが DataFrame 操作に基づいて期待するものと一致しない場合、 query オプションを使用して、希望する正確な SQL 構文を提供します。

テーブル全体を読む:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

クエリの結果を読み取ります:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()

SparkからSnowflakeへのデータの移動

DataFrame の内容をSnowflakeのテーブルに保存する手順は、SnowflakeからSparkへの書き込みと類似しています。

  1. DataFramewrite() メソッドを使用して DataFrameWriter を構成します。

  2. format() メソッドを使用して、 SNOWFLAKE_SOURCE_NAME を指定します。定義については、 データソースクラス名の指定 (このトピック)をご参照ください。

  3. option() または options() メソッドを使用して、コネクタオプションを指定します。詳細については、 コネクタの構成オプションの設定 (このトピック)をご参照ください。

  4. データを書き込むテーブルを指定するには、 dbtable オプションを使用します。

  5. コンテンツの保存モードを指定するには、 mode() メソッドを使用します。

    詳細については、 SaveMode (Sparkドキュメント)をご参照ください。

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

SparkからSnowflakeへの JSON のエクスポート

Spark DataFrames には、文字列としてシリアル化された JSON オブジェクトを含めることができます。次のコードは、通常の DataFrame を JSON データを含む DataFrame に変換する例を示しています。

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)

結果の jsonDataFrame には、タイプ StringType の単一列が含まれています。その結果、この DataFrame が共通の SaveMode.Overwrite モードでSnowflakeにエクスポートされると、新しい表がSnowflakeにタイプ VARCHAR の単一列で作成されます。

jsonDataFrameVARIANT 列にロードするには、

  1. Snowflakeテーブルを作成します(Snowflake JDBC ドライバーを使用してScala内からSnowflakeに接続)。Snowflake JDBC ドライバーの接続パラメーターの説明については、 JDBC ドライバーを構成する をご参照ください。

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
        String jdbcUrl = "jdbc:snowflake://xy12345.snowflakecomputing.com/";
    
        Properties properties = new Properties();
        properties.put("user", "peter");
        properties.put("password", "test");
        properties.put("account", "xy12345");
        properties.put("warehouse", "mywh");
        properties.put("db", "mydb");
        properties.put("schema", "public");
    
        // get connection
        System.out.println("Create JDBC connection");
        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
        System.out.println("Done creating JDBC connection\n");
        // create statement
        System.out.println("Create JDBC statement");
        Statement statement = connection.createStatement();
        System.out.println("Done creating JDBC statement\n");
        // create a table
        System.out.println("Create my_variant_table table");
        statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
        statement.close();
        System.out.println("Done creating demo table\n");
    
        connection.close();
        System.out.println("Close connection\n");
      }
    }
    
  2. 既存のテーブルを再利用する場合は、 SaveMode.Overwrite の代わりに、 SaveMode.Append を使用します。 JSON を表す文字列値がSnowflakeにロードされると、ターゲット列が VARIANT型であるため、 JSONとして解析されます。例:

    df.write
        .format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "my_variant_table")
        .mode(SaveMode.Append)
        .save()
    

DDL/DML SQL ステートメントの実行

クエリに加えて、 runQuery() メソッド( Utils オブジェクトの)を使用して、 DDL/DML SQL ステートメントを実行します。例:

var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
    )
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")

sfOptions は DataFramesの読み書きに使用されるパラメーターマップです。

runQuery メソッドは TRUE または FALSE のみを返します。結果セットを返さないステートメント、例えば DDL ステートメント(CREATE TABLE のような)、および DML ステートメント( INSERTUPDATEDELETE のような)を対象としています。 SELECTSHOW などの結果セットを返すステートメントでは有効ではありません。

タイムスタンプとタイムゾーンの操作

Sparkでは、Scala/Java Timestamp型と同等のタイムスタンプ1種類のみを提供します。Snowflakeの TIMESTAMP_LTZ (現地時間帯)データ型の動作とほぼ同様です。そのため、SparkとSnowflakeの間でデータを転送するときは、タイムゾーンに準じた時間を正しく保つために、次の方法を使用することをお勧めします。

  • Snowflakeでは、 TIMESTAMP_LTZ データ型のみを使用します。

    注釈

    デフォルトのタイム・スタンプ・データ・タイプ・マッピングは TIMESTAMP_NTZ (タイムゾーンなし)であるため、 必ず TIMESTAMP_TYPE_MAPPING パラメーターにより TIMESTAMP_LTZ の使用を明示的に設定します。

  • Sparkのタイムゾーンを UTC に設定し、Snowflakeでこのタイムゾーンを使用します(例:コネクタに sfTimezone オプションを設定しないでください。また、Snowflakeでタイムゾーンを明示的に設定しないでください)。このシナリオでは、 TIMESTAMP_LTZ と TIMESTAMP_NTZ は事実上同等です。

    タイムゾーンを設定するには、Sparkコードに次の行を追加します:

    java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
    

これらのアプローチのいずれも実装しない場合、望ましくない時間変更が発生する可能性があります。たとえば、次のシナリオを考えます:

  • Sparkのタイムゾーンは America/New_York に設定されています。

  • Snowflakeのタイムゾーンは Europe/Warsaw に設定されています。これは次のいずれかの方法で発生します。

    • コネクタの sfTimezoneEurope/Warsaw に設定しています。

    • コネクタの sfTimezonesnowflake に設定し、Snowflakeの TIMEZONE セッションパラメータを Europe/Warsaw に設定します。

  • Snowflakeでは TIMESTAMP_NTZ と TIMESTAMP_LTZ の両方が使用されています。

このシナリオでは、

  1. Snowflakeの TIMESTAMP_NTZ 列で 12:00:00 を表す値がSparkに送信された場合、この値にはタイムゾーン情報は含まれません。Sparkはこの値、ニューヨークの 12:00:00 として扱います。

  2. Sparkがこの値 12:00:00 (ニューヨーク)をSnowflakeに送り返して TIMESTAMP_LTZ 列にロードすると、自動的に変換され、 18:00:00 (ワルシャワタイムゾーン用)としてロードされます。

  3. この値がSnowflakeで TIMESTAMP_NTZ に変換されると、ユーザーには 18:00:00 が表示されますが、これは元の値 12:00:00 とは異なります。

要約すると、これらのルールの少なくとも1つを 少なくとも 厳守することをお勧めします。

  • SparkとSnowflakeの両方に、 同じ タイムゾーン、理想的には UTC を使用します。

  • SparkとSnowflakeの間でデータを転送するには、 TIMESTAMP_LTZ データ型 のみ を使用します。

サンプルScalaプログラム

重要

このサンプルプログラムは、バージョン2.2.0(またはそれ以上)のコネクタを使用していることを前提としています。Snowflake内部ステージを使用して一時データを保存するため、一時データの保存にS3の場所は必要ありません。以前のバージョンを使用している場合は、既存のS3の場所があり、 tempdirawsAccessKeyawsSecretKey の値( sfOptions の)を含める必要があります。詳細については、 外部データ転送の AWS オプション (このトピック)をご参照ください。

次のScalaプログラムは、Spark用Snowflakeコネクタの完全な使用例を提供します。コードを使用する前に、 コネクタの構成オプションの設定 (このトピック内)で説明されているように、次の文字列を適切な値に置き換えます。

  • <アカウント名> :アカウントの名前(Snowflakeが提供)。

  • <ユーザー名><パスワード> :Snowflakeユーザーのログイン認証情報。

  • <データベース><スキーマ><ウェアハウス> :Snowflakeセッションのデフォルト。

サンプルのScalaプログラムは、基本認証(つまり、ユーザー名とパスワード)を使用します。 OAuth で認証する場合は、 外部 OAuth の使用(このトピック内)をご参照ください。

import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) from t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Pythonでのコネクタの使用

Pythonでコネクタを使用することは、Scalaの使用法に非常に似ています。

Sparkディストリビューションに含まれている bin/pyspark スクリプトを使用することをお勧めします。

pyspark スクリプトの構成

pyspark スクリプトを spark-shell スクリプトと同様に構成する必要があります( --packages または --jars オプションを使用)。例:

bin/pyspark --packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4

Snowflake Sparkコネクタおよび JDBC コネクタ.jarファイルを CLASSPATH 環境変数に含めることを忘れないでください。

spark-shell スクリプトの構成の詳細については、 ステップ4:ローカルSparkクラスターまたはAmazon EMR がホストするSpark環境を構成する をご参照ください。

セッションでのプッシュダウンの有効化/無効化

コネクタのバージョン2.1.0(以降)はクエリプッシュダウンをサポートします。これは、SnowflakeがSparkデータソースである場合にクエリ処理をSnowflakeにプッシュすることにより、パフォーマンスを大幅に改善できます。

デフォルトでは、プッシュダウンは有効になっていません。Sparkセッション内で有効にするには、 SparkSession オブジェクトをインスタンス化した後、次の静的メソッド呼び出しを呼び出します:

SnowflakeConnectorUtils.enablePushdownSession()

例:

sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

この例では、 scSparkSession オブジェクトです。

disablePushdownSession() メソッドを呼び出すことにより、いつでも無効にすることができます。例:

sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

サンプルPythonスクリプト

重要

このサンプルスクリプトは、バージョン2.2.0(またはそれ以上)のコネクタを使用していることを前提としています。Snowflake内部ステージを使用して一時データを保存するため、このデータの保存にS3の場所は必要ありません。以前のバージョンを使用している場合は、既存のS3の場所があり、 tempdirawsAccessKeyawsSecretKey の値( sfOptions の)を含める必要があります。詳細については、 外部データ転送の AWS オプション (このトピック)をご参照ください。

pyspark スクリプトを構成したら、 SQL クエリおよびその他の操作を実行できます。以下は、単純な SQL クエリを実行するPythonスクリプトの例です。このスクリプトは、基本的なコネクタの使用法を示しています。このドキュメントのScalaの例のほとんどは、Pythonで使用するための最小限の労力/変更で適応できます。

サンプルPythonスクリプトは、基本認証(つまり、ユーザー名とパスワード)を使用します。 OAuth で認証する場合は、 外部 OAuth の使用(このトピック内)をご参照ください。

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<YOUR_APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<YOUR_AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<YOUR_AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_name>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

ちなみに

sfOptionsSNOWFLAKE_SOURCE_NAME の使用に注意してください。これにより、コードが簡素化され、エラーが発生する可能性が低くなります。

sfOptions でサポートされているオプションの詳細については、 コネクタの構成オプションの設定 (このトピック)をご参照ください。

データ型マッピング

Sparkコネクタは、多くの一般的なデータ型間の変換をサポートしています。

Spark SQL からSnowflakeへ

Spark データ タイプ

Snowflake データ タイプ

ArrayType

VARIANT

BinaryType

サポート対象外

BooleanType

BOOLEAN

ByteType

INTEGER.Snowflakeは BYTE タイプをサポートしていません。

DateType

DATE

DecimalType

DECIMAL

DoubleType

DOUBLE

FloatType

FLOAT

IntegerType

INTEGER

LongType

INTEGER

MapType

VARIANT

ShortType

INTEGER

StringType

長さが指定されている場合、 VARCHAR(N)、そうではない場合、 VARCHAR

StructType

VARIANT

TimeType

サポート対象外

TimestampType

TIMESTAMP

SnowflakeからSpark SQLまで

Snowflake データ タイプ

Spark データ タイプ

ARRAY

StringType

BIGINT

DecimalType(38, 0)

BINARY

サポート対象外

BLOB

サポート対象外

BOOLEAN

BooleanType

CHAR

StringType

CLOB

StringType

DATE

DateType

DECIMAL

DecimalType

DOUBLE

DoubleType

FLOAT

DoubleType

INTEGER

DecimalType(38, 0)

OBJECT

StringType

TIMESTAMP

TimestampType

TIME

StringType (Sparkコネクタバージョン2.4.14以降)

VARIANT

StringType

コネクタの構成オプションの設定

次のオプションは、コネクタの動作を構成します。それらは、 Spark DataframeReader クラス.option(<キー>, <値>) または .options(<マップ>) を使用して指定できます。

ちなみに

オプションの使用を容易にするために、単一の Map 変数に保存し、 .options() APIを使用することをお勧めします。

必要な接続オプション

Snowflakeに接続するには、次のオプションが必要です。

sfUrl

アカウントの ホスト名 を次の形式で指定します。

アカウント名.snowflakecomputing.com

ただし、完全なアカウント名には、アカウントがホストされている 地域 および クラウドプラットフォーム を識別する 追加 のセグメントが含まれている場合があります。

地域別のアカウント名の例

アカウント名が xy12345 の場合、

クラウドプラットフォーム/地域

完全なアカウント名

AWS

US 西部(オレゴン)

xy12345

US 東部(オハイオ)

xy12345.us-east-2.aws

US 東部(バージニア北部)

xy12345.us-east-1

US 東部(商業組織、バージニア政府北部)

xy12345.us-east-1-gov.aws

カナダ(中部)

xy12345.ca-central-1.aws

EU (アイルランド)

xy12345.eu-west-1

EU (フランクフルト)

xy12345.eu-central-1

アジア太平洋(東京)

xy12345.ap-northeast-1.aws

アジア太平洋(ムンバイ)

xy12345.ap-south-1.aws

アジア太平洋(シンガポール)

xy12345.ap-southeast-1

アジア太平洋(シドニー)

xy12345.ap-southeast-2

GCP

US 中央部1(アイオワ)

xy12345.us-central1.gcp

ヨーロッパ西部2(ロンドン)

xy12345.europe-west2.gcp

ヨーロッパ西部4(オランダ)

xy12345.europe-west4.gcp

Azure

西 US 2(ワシントン)

xy12345.west-us-2.azure

東 US 2(バージニア)

xy12345.east-us-2.azure

US 政府バージニア

xy12345.us-gov-virginia.azure

カナダ中央部(トロント)

xy12345.canada-central.azure

西ヨーロッパ(オランダ)

xy12345.west-europe.azure

スイス北部(チューリッヒ)

xy12345.switzerland-north.azure

東南アジア(シンガポール)

xy12345.southeast-asia.azure

オーストラリア東部(ニューサウスウェールズ)

xy12345.australia-east.azure

重要

次のいずれかの条件に該当する場合、アカウント名はこの例の構造とは異なります。

  • Snowflake Editionが VPS の場合、アカウント名の詳細については Snowflakeサポート にお問い合わせください。

  • AWS PrivateLink がアカウントで有効になっている場合、アカウント名には追加の privatelink セグメントが 必要 です。詳細については、 AWS PrivateLink とSnowflake をご参照ください。

sfUser

Snowflakeユーザーのログイン名。

sfPassword

Snowflakeユーザーのパスワード。

認証には、 sfPasswordpem_private_key、または sfAuthenticator のいずれかのオプションを使用する必要があることに注意してください。

pem_private_key

キーペア認証用のシークレットキー( PEM 形式)。手順については、このトピックの キーペア認証の使用 をご参照ください。

認証には、 sfPasswordpem_private_key、または sfAuthenticator のいずれかのオプションを使用する必要があることに注意してください。

sfAuthenticator

外部 OAuth を使用し、Snowflakeに対して認証することを指定します。値を oauth に設定します。

認証には、 sfPasswordpem_private_key、または sfAuthenticator のいずれかのオプションを使用する必要があることに注意してください。

外部 OAuth を使用するには、 sfToken パラメーターを設定する必要があります。

sfToken

値を外部 OAuth アクセストークンに設定します。

この接続パラメーターでは、 sfAuthenticator パラメーター値を oauth に設定する必要があります。

デフォルトはありません。

必要なコンテキストオプション

セッションのデータベースとスキーマコンテキストを設定するには、次のオプションが必要です:

sfDatabase

接続後にセッションに使用するデータベース。

sfSchema

接続後にセッションに使用するスキーマ。

追加オプション

残りのすべてのオプションは、必須ではありません。

sfAccount

アカウント名(例: xy12345)。アカウント名が sfUrl で指定されているため、このオプションは不要になりました。ここでは、下位互換性のためにのみ文書化されています。

sfWarehouse

接続後にセッションに使用するデフォルトの仮想ウェアハウス。

sfRole

接続後にセッションに使用するデフォルトのセキュリティロール。

sfTimezone

Sparkでの作業時にSnowflakeが使用するタイムゾーン。パラメーターはSnowflakeのタイムゾーンのみを設定することに注意してください。Spark環境は変更されません。サポートされている値は次のとおりです。

  • spark :Sparkのタイムゾーンを使用します(デフォルト)。

  • snowflake :Snowflakeの現在のタイムゾーンを使用します。

  • sf_default :接続しているSnowflakeユーザーのデフォルトのタイムゾーンを使用します。

  • タイムゾーン :有効な場合、特定のタイムゾーン(例: America/New_York)を使用します。

    このオプションの設定の影響の詳細については、 タイムスタンプとタイムゾーンの操作 (このトピック内)をご参照ください。

sfCompress

on (デフォルト)に設定すると、SnowflakeとSparkの間で渡されるデータが圧縮されます。

s3MaxFileSize

SnowflakeからSparkにデータを移動するときに使用されるファイルのサイズ。デフォルトは 10MBです。

parallelism

SnowflakeとSpark間のデータのアップロードとダウンロードに使用するスレッドプールのサイズ。デフォルトは4です。

一般に、スループットを増加または減少させる特定のニーズがない限り、この値を変更する必要はありません。Sparkアプリケーションの並列処理は、パーティションとエグゼキューターによって最適に管理されます。また、高レベルのスループットを実現するために、並列度を任意の大きな数値に設定しないでください。これにより、アップロード/ダウンロードの速度が低下する可能性があるなど、マイナスの意図しない効果が生じる可能性があります。

例:

df.write
.format(SNOWFLAKE_SOURCE_NAME)
.option("parallelism", "8")
.mode(SaveMode.Overwrite)
.save()
preactions

SparkとSnowflakeの間でデータが転送される前に実行される SQL コマンドのセミコロン区切りリスト。

SQL コマンドに %s が含まれる場合、 %s は操作で参照されるテーブル名に置き換えられます。

postactions

SparkとSnowflakeの間でデータが転送される後に実行される SQL コマンドのセミコロン区切りリスト。

SQL コマンドに %s が含まれる場合、これは操作で参照されるテーブル名に置き換えられます。

truncate_columns

on (デフォルト)に設定すると、 COPY コマンドはターゲット列の長さを超えるテキスト文字列を自動的に切り捨てます。 off に設定した場合、ロードされた文字列がターゲット列の長さを超えると、コマンドはエラーを生成します。

truncate_table

このパラメーターは、そのテーブルを上書きするときに、SnowflakeがSnowflakeターゲットテーブルのスキーマを保持するかどうかを制御します。

デフォルトでは、Snowflakeのターゲットテーブルが上書きされると、そのターゲットテーブルのスキーマも上書きされます。新しいスキーマは、ソーステーブル(Sparkデータフレーム)のスキーマに基づいています。

ただし、ソースのスキーマが理想的でない場合があります。例えば、ユーザーは、最初のソース列のデータ型が INTEGERであっても、Snowflakeターゲットテーブルが将来 FLOAT 値を格納できるようにしたい場合があります。その場合、Snowflakeテーブルのスキーマを上書きしないでください。 Snowflakeテーブルは単に切り捨ててから、現在のスキーマで再利用する必要があります。

このパラメーターの可能な値は次のとおりです:

  • on

  • off

このパラメーターが on の場合、ターゲットテーブルの元のスキーマが保持されます。このパラメーターが off の場合、テーブルの古いスキーマは無視され、ソースのスキーマに基づいて新しいスキーマが生成されます。

このパラメーターはオプションです。

このパラメーターのデフォルト値は off です(つまり、デフォルトでは元のテーブルスキーマが上書きされます)。

Sparkデータ型からSnowflakeデータ型へのマッピング(およびその逆)の詳細については、 データ型マッピング (このトピック)をご参照ください。

continue_on_error

この変数は、ユーザーが無効なデータ(たとえば、バリアントデータ型列の無効な JSON 形式)を入力した場合に COPY コマンドを中止するかどうかを制御します。

可能な値は次のとおりです:

  • on

  • off

on は、エラーが発生しても続行することを意味します。値 off は、エラーが発生した場合に中止することを意味します。

このパラメーターはオプションです。

このパラメーターのデフォルト値は off です。

このオプションをオンにすることはお勧めしません。Sparkコネクタを使用して COPYing からSnowflakeにエラーが報告された場合、データが失われる可能性があります。

注釈

行が拒否されたまたは欠落している場合、入力ソースでそれらの行に明らかに欠陥がない場合は、Snowflakeに報告してください。

usestagingtable

このパラメーターは、データのロードでステージングテーブルを使用するかどうかを制御します。

ステージングテーブルは、コネクタによって作成される通常のテーブル(一時的な名前を持つ)です。データのロード操作が成功すると、元のターゲットテーブルが削除され、ステージングテーブルの名前が元のターゲットテーブルの名前に変更されます。データのロード操作が失敗した場合、ステージングテーブルは削除され、ターゲットテーブルには操作の直前に保持していたデータが残ります。したがって、ステージングテーブルを使用すると、操作が失敗した場合に元のターゲットテーブルデータを保持できます。安全のため、Snowflakeはほとんどの状況でステージングテーブルを使用することを強くお勧めします。

コネクタがステージングテーブルを作成するには、Sparkコネクタを介して COPY を実行するユーザーに、テーブルを作成するための十分な権限が必要です。ユーザーがテーブルを作成する権限を持っていない場合、直接ロード(つまり、ステージングテーブルを使用しない読み込み)は便利です。

このパラメーターの可能な値は次のとおりです:

  • on

  • off

パラメーターが on の場合、ステージングテーブルが使用されます。このパラメーターが off の場合、データはターゲットテーブルに直接ロードされます。

このパラメーターはオプションです。

このパラメーターのデフォルト値は on です(つまり、ステージングテーブルを使用します)。

autopushdown

このパラメーターは、自動クエリプッシュダウンを有効にするかどうかを制御します。

プッシュダウンが有効になっている場合、クエリがSparkで実行されるときに、クエリの一部をSnowflakeサーバーに「プッシュダウン」できる場合、プッシュダウンされます。これにより、一部のクエリのパフォーマンスが向上します。

このパラメーターはオプションです。

コネクタがSparkの互換バージョンに接続されている場合、デフォルト値は on です。それ以外の場合、デフォルト値は off です。

コネクタが意図されているものとは異なるバージョンのSparkに接続されている場合(たとえば、コネクタのバージョン2.3がSparkのバージョン2.2に接続されている場合)、このパラメータが on に設定されていても自動プッシュダウンは無効になります。

purge

これが on に設定されている場合、コネクタは外部データ転送を介してSparkからSnowflakeに転送するときに作成された一時ファイルを削除します。このパラメーターが off に設定されている場合、これらのファイルはコネクターによって自動的に削除されません。

パージは、SparkからSnowflakeへの転送に対してのみ機能し、SnowflakeからSparkへの転送には機能しません。

可能な値は次のとおりです

  • on

  • off

デフォルト値は off です。

columnmap

このパラメーターは、SparkからSnowflakeにデータを書き込み、Snowflakeテーブルの列名がSparkテーブルの列名と一致しない場合に役立ちます。各Snowflake宛先列に対応するSparkソース列を示すマップを作成できます。

パラメータは、次の形式の単一の文字列リテラルです。

"Map(col_2 -> col_b, col_3 -> col_a)"

たとえば、次のシナリオを考えます:

  • Sparkの df という名前のデータフレームには3つの列があります:

    col_1col_2col_3

  • Snowflakeの tb という名前のテーブルには2つの列があります:

    col_acol_b

  • 次の値をコピーします:

    • df.col_2 から tb.col_b に。

    • df.col_3 から tb.col_a に。

columnmap パラメーターの値は次のとおりです:

Map(col_2 -> col_b, col_3 -> col_a)

次のScalaコードを実行して、この値を生成できます。

Map("col_2"->"col_b","col_3"->"col_a").toString()

このパラメーターのデフォルト値はnullです。つまり、デフォルトでは、ソース表と宛先表の列名は一致する必要があります。

このパラメーターは、SparkからSnowflakeに書き込むときにのみ使用されます。SnowflakeからSparkに書き込む場合は適用されません。

keep_column_case

SparkからSnowflakeにテーブルを書き込む場合、Sparkコネクタはデフォルトで、列名が二重引用符で囲まれていない限り、列名の文字を大文字にシフトします。

SnowflakeからSparkにテーブルを書き込む場合、Sparkコネクタはデフォルトで、大文字、アンダースコア、数字を除くすべての文字を含む列名を二重引用符で囲みます。

keep_column_caseを on に設定すると、Sparkコネクタはこれらの変更を行いません。

可能な値は次のとおりです:

  • on

  • off

デフォルト値は off です。

column_mapping

コネクタは、Sparkデータフレームの列をSnowflakeテーブルにマップする必要があります。これは、列名に基づいて(順序に関係なく)、または列の順序に基づいて実行できます(つまり、データフレームの最初の列は、列名に関係なく、テーブルの最初の列にマップされます)。

デフォルトでは、マッピングは順序に基づいて行われます。このパラメーターを name に設定することでオーバーライドできます。これは、コネクタに列名に基づいて列をマップするよう指示します。(名前のマッピングでは大文字と小文字は区別されません。)

このパラメーターの可能な値は次のとおりです:

  • order

  • name

デフォルト値は order です。

column_mismatch_behavior

このパラメーターは、 column_mapping パラメーターが name に設定されている場合にのみ適用されます。

Sparkデータフレームの列名とSnowflakeテーブルが一致しない場合、

  • column_mismatch_behaviorerror の場合、Sparkコネクタはエラーを報告します。

  • column_mismatch_behaviorignore の場合、Sparkコネクタはエラーを無視します。

    • ドライバーは、Snowflakeテーブルに対応する列がないSparkデータフレームの列をすべて破棄します。

    • ドライバーは、Sparkデータフレームに対応する列がないSnowflakeテーブルの列に NULLs を挿入します。

潜在的なエラーは次のとおりです:

  • Sparkデータフレームには、大文字と小文字を除いて同一の列を含めることができます。列名のマッピングでは大文字と小文字が区別されないため、データフレームからテーブルへの正しいマッピングを決定することはできません。

  • Snowflakeテーブルには、大文字と小文字を除いて同一の列を含めることができます。列名のマッピングでは大文字と小文字が区別されないため、データフレームからテーブルへの正しいマッピングを決定することはできません。

  • SparkデータフレームとSnowflakeテーブルには、共通の列名がない場合があります。理論的には、Sparkコネクタはすべての行のすべての列に NULLs を挿入できますが、これは通常無意味であるため、 column_mismatch_behaviorignore に設定されていてもコネクタはエラーをスローします。

このパラメーターの可能な値は次のとおりです:

  • error

  • ignore

デフォルト値は error です。

time_output_format

このパラメーターを使用すると、ユーザーは、返される TIME データの形式を指定できます。

このパラメーターの可能な値は、 時刻形式 で指定された時刻形式の可能な値です。

このパラメーターは出力のみに影響し、入力には影響しません。

partition_size_in_mb

このパラメーターは、クエリ結果セットが非常に大きく、複数の DataFrame パーティションに分割する必要がある場合に使用されます。このパラメーターは、各 DataFrame パーティションの推奨される非圧縮サイズを指定します。パーティションの数を減らすには、このサイズを大きくします。

このサイズは推奨サイズとして使用されます。パーティションの実際のサイズはこれより小さくても大きくてもかまいません。

このオプションは、 use_copy_unload パラメーターが FALSE の場合にのみ適用されます。

このパラメーターはオプションです。

デフォルト値は 100 (MB)です。

use_copy_unload

これが FALSE の場合、Snowflakeは SELECTing データのときにArrowデータ形式を使用します。これが TRUE に設定されている場合、Snowflakeは、選択したデータを送信するために COPY UNLOAD コマンドを使用する古い動作に戻ります。

このパラメーターはオプションです。

デフォルト値は FALSE です。

キーペア認証の使用

Snowflakeは一般的なユーザー名/パスワード認証ではなく、キーペア認証の使用をサポートしています。この認証方法には、2048ビット(最小)の RSA キーペアが必要です。 OpenSSLを使用して公開キーと秘密キーのペアを生成します。公開キーは、Snowflakeクライアントを使用するSnowflakeユーザーに割り当てられます。

公開/秘密キーペアを構成するには:

  1. ターミナルウィンドウのコマンドラインから、秘密キーを生成します。

    秘密キーは、暗号化バージョンまたは非暗号化バージョンのいずれかを生成できます。

    非暗号化バージョンを生成するには、次のコマンドを使用します。

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
    

    暗号化バージョンを生成するには、次の(「-nocrypt」を省略する)コマンドを使用します。

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
    

    通常は、暗号化されたバージョンを生成する方が安全です。

    2番目のコマンドを使用して秘密キーを暗号化する場合、 OpenSSL は秘密キーファイルの暗号化に使用されるパスフレーズの入力を求めます。強力なパスフレーズを使用して秘密キーを保護することをお勧めします。このパスフレーズを安全な場所に記録します。Snowflakeに接続するときに入力します。パスフレーズは秘密キーの保護にのみ使用され、Snowflakeには送信されないことに注意してください。

    サンプル PEM 秘密キー

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. コマンドラインから、秘密キーを参照して公開キーを生成します:

    秘密キーが暗号化され、「rsa_key.p8」という名前のファイルに含まれていると仮定して、次のコマンドを使用します:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    サンプル PEM 公開キー

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. 公開キーファイルと秘密キーファイルを保存用のローカルディレクトリにコピーします。ファイルへのパスを記録します。秘密キーは PKCS#8(公開キー暗号化標準)形式を使用して格納され、前の手順で指定したパスフレーズを使用して暗号化されることに注意してください。ただし、オペレーティングシステムが提供するファイル許可メカニズムを使用して、ファイルを不正アクセスから保護する必要があります。ファイルが使用されていない場合、ファイルを保護するのはユーザーの責任です。

  4. ALTER USER を使用して、Snowflakeユーザーに公開キーを割り当てます。例:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    注釈

    • ユーザーを変更できるのは、セキュリティ管理者(つまり、 SECURITYADMIN ロールのユーザー)以上のみです。

    • SQL ステートメントで公開キーのヘッダーとフッターを除外します。

    DESCRIBE USER を使用してユーザーの公開キーの指紋を検証します:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    注釈

    RSA_PUBLIC_KEY_2_FP プロパティについては、このトピックの キーローテーション で説明しています。

  5. pem_private_key 接続オプションを使用して、プライベートキーの 非暗号化 コピーを送信します。

注意

セキュリティ上の理由から、アプリケーションで pem_private_key をハードコーディングするのではなく、安全なソースからキーを読み取った後、パラメーターを動的に設定する必要があります。キーが暗号化されている場合は、キーを復号化し、復号化したバージョンを送信します。

Pythonの例では、 pem_private_key ファイル、 rsa_key.p8 は次のとおりです。

  • 環境変数 PRIVATE_KEY_PASSPHRASE を使用して、パスワードで保護されたファイルから直接読み取られます。

  • sfOptions 文字列で式 pkb を使用します。

接続するには、Pythonの例をファイル(つまり、 <file.py>)に保存してから、次のコマンドを実行します。

spark-submit --packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4 <file.py>

Python

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os

with open("<path>/rsa_key.p8", "rb") as key_file:
  p_key = serialization.load_pem_private_key(
    key_file.read(),
    password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
    backend = default_backend()
    )

pkb = p_key.private_bytes(
  encoding = serialization.Encoding.PEM,
  format = serialization.PrivateFormat.PKCS8,
  encryption_algorithm = serialization.NoEncryption()
  )

pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')

sfOptions = {
  "sfURL" : "<account_name>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "pem_private_key" : pkb,
  "sfDatabase" : "<database>",
  "sfSchema" : "schema",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "COLORS") \
    .load()

df.show()

キーローテーション

Snowflakeは、複数のアクティブキーをサポートして、連続したローテーションを可能にします。内部的に従う有効期限のスケジュールに基づいて、公開キーと秘密キーをローテーションして交換します。

現在、 ALTER USERRSA_PUBLIC_KEY および RSA_PUBLIC_KEY_2 パラメーターを使用して、最大2個の公開キーを1人のユーザーに関連付けることができます。

キーをローテーションするには:

  1. キーペア認証の使用 の手順を完了して:

    • 新しい秘密キーと公開キーのセットを生成します。

    • ユーザーに公開キーを割り当てます。公開キーの値を RSA_PUBLIC_KEY または RSA_PUBLIC_KEY_2 (現在使用されていないキーの値)に設定します。例:

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Snowflakeに接続するようにコードを更新します。新しい秘密キーを指定します。

    Snowflakeは、接続情報とともに送信された秘密キーに基づいて、認証用の正しいアクティブな公開キーを検証します。

  3. ユーザープロファイルから古い公開キーを削除します。例:

    alter user jsmith unset rsa_public_key;
    

外部 OAuth を使用する

Sparkコネクタバージョン2.7.0以降から、 外部 OAuth を使用して、サンプルScalaプログラムまたはサンプルPythonスクリプトを使用してSnowflakeを認証できます。

外部 OAuth とSparkコネクタを使用してSnowflakeを認証する前に、サポートされている外部 OAuth 認証サーバーの1つ、または外部 OAuth カスタムクライアント の外部 OAuth セキュリティ統合を構成します。

ScalaおよびPythonの例では、 sfPassword パラメーターが、 sfAuthenticator および sfToken パラメーターに置き換えられていることに注意してください。

Scala:

// spark connector version

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.DataFrame

var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<username>",
    "sfAuthenticator" -> "oauth",
    "sfToken" -> "<external_oauth_access_token>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "region")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Python:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<YOUR_APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<YOUR_AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<YOUR_AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_name>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfAuthenticator" : "oauth",
  "sfToken" : "<external_oauth_access_token>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

外部データ転送の AWS オプション

これらのオプションは、一時データが保存されるAmazon S3ロケーションを指定し、そのロケーションにアクセスするための認証の詳細を提供するために使用されます。これらは外部データ転送を行う場合 のみ 必要です。次のいずれかに該当する場合、外部データ転送が必要です:

  • バージョン2.1.x以下のSparkコネクタ(内部転送をサポートしない)を使用している、または

  • 転送には36時間以上かかる可能性があります(内部転送では、36時間後に期限が切れる一時的な認証情報を使用)。

tempDir

中間データが保存されるS3の場所(例: s3n://xy12345-bucket/spark-snowflake-tmp/)。

tempDir が指定されている場合、次のいずれかも指定する必要があります。

  • awsAccessKeyawsSecretKey . または

  • temporary_aws_access_key_idtemporary_aws_secret_access_keytemporary_aws_session_token

awsAccessKeyawsSecretKey

これらは、 tempDir で指定された場所へのアクセスを許可する標準の AWS 認証情報です。これらのオプションは両方とも一緒に設定する必要があることに注意してください。

設定されている場合、既存の SparkContext オブジェクトから取得できます。

これらの変数を指定する場合、 tempDir も指定する必要があります。

これらの認証情報は、Hadoopクラスターにも設定する必要があります。

temporary_aws_access_key_idtemporary_aws_secret_access_keytemporary_aws_session_token

これらは、 tempDir で指定された場所へのアクセスを許可する一時的な AWS 認証情報です。これらのオプションは3つとも一緒に設定する必要があることに注意してください。

また、これらのオプションが設定されている場合、 awsAccessKey および awsSecretKey オプションよりも優先されます。

temporary_aws_access_key_idtemporary_aws_secret_access_key、および temporary_aws_session_token を指定する場合、 tempDir も指定する必要があります。それ以外の場合、これらのパラメーターは無視されます。

check_bucket_configuration

on (デフォルト)に設定されている場合、コネクタはデータ転送に使用されるバケットにライフサイクルポリシーが構成されているかどうかを確認します(詳細については、 AWS 外部S3バケットの準備 を参照)。ライフサイクルポリシーが存在しない場合、警告がログに記録されます。

off に設定して)このオプションを無効にすると、このチェックはスキップされます。これは、ユーザーがバケットライフサイクルポリシーではなくバケットデータ操作にアクセスできる場合に役立ちます。このオプションを無効にすると、クエリの実行時間をわずかに短縮できます。

詳細については、 データ交換のためのS3の認証 (このトピック)をご参照ください。

外部データ転送のAzureオプション

このセクションでは、外部データ転送を行うときにAzure Blobストレージに適用されるパラメーターについて説明します。次のいずれかに該当する場合、外部データ転送が必要です。

  • バージョン2.1.x以下のSparkコネクタ(内部転送をサポートしない)を使用している、または

  • 転送には36時間以上かかる可能性があります(内部転送では、36時間後に期限が切れる一時的な認証情報を使用)。

Azure Blobストレージで外部転送を使用する場合、以下で説明するパラメーターを使用して、Azureコンテナの場所とそのコンテナの SAS (共有アクセス署名)を指定します。

tempDir

中間データが保存されるAzure Blobストレージコンテナ。これは URLの形式で、たとえば次のとおりです。

wasb://<azureコンテナ>@<azureアカウント>.<azureエンドポイント>/

temporary_azure_sas_token

Azure Blobストレージの SAS トークンを指定します。

詳細については、このトピックの データ交換のためのAzureの認証 をご参照ください。

Sparkの一時ストレージのAzure情報を指定する

Azure Blobストレージを使用して一時ストレージを提供し、SparkとSnowflakeの間でデータを転送する場合、SparkとSnowflake Sparkコネクタに一時ストレージの場所と認証情報を提供する必要があります。

Sparkに一時的な保存場所を提供するには、Sparkクラスターで次のようなコマンドを実行します。

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

最後のコマンドには次の変数が含まれていることに注意してください。

  • <コンテナ> および <アカウント> :Azureデプロイメントのコンテナとアカウント名です。

  • <Azureエンドポイント> :Azureデプロイメントの場所のエンドポイントです。たとえば、Azure US デプロイメントを使用している場合、エンドポイントは blob.core.windows.net である可能性があります。

  • <Azure SAS> :共有アクセス署名セキュリティトークンです。

これらの各変数をAzure BLOBストレージアカウントの適切な情報に置き換えます。

Snowflakeセッションパラメータをコネクタのオプションとして渡す

Spark用Snowflakeコネクタは、Snowflakeへの任意のセッションレベルのパラメータの送信をサポートしています(詳細については、 セッションパラメーター をご参照ください)。これは、 options オブジェクトに ("<キー>" -> "<値>") ペアを追加することで実現できます。 <キー> はセッションパラメーター名で、 <値> は値です。

注釈

<値> は、数値またはブール値(例: "1" または "true")を受け入れるパラメーターであっても、二重引用符で囲まれた文字列でなければなりません。

たとえば、次のコードサンプルは、 "false" の値を持つ USE_CACHED_RESULT セッションパラメーターを渡します。これにより、以前に実行されたクエリの結果の使用が無効になります。

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method

セキュリティに関する考慮事項

お客様は、マルチノードのSparkシステムでは、ノード間の通信が安全であることを確認する必要があります。SparkマスターはSnowflake認証情報をSparkワーカーに送信し、それらのワーカーがSnowflakeステージにアクセスできるようにします。SparkマスターとSparkワーカー間の通信が安全でない場合、認証情報は不正な第三者によって読み取られる可能性があります。

データ交換のためのS3の認証

このセクションでは、データ交換にS3を使用する場合の認証方法について説明します。

このタスクは、次のいずれかの状況で のみ 必要です:

  • Snowflake Connector for Sparkバージョンは2.1.x(またはそれ以下)です。v2.2.0以降、コネクタはデータ交換にSnowflake内部の一時ステージを使用します。現在コネクタのバージョン2.2.0(またはそれ以上)を使用していない場合、Snowflakeは最新バージョンにアップグレードすることを強くお勧めします。

  • Snowflake Connector for Sparkバージョンは2.2.0(またはそれ以上)ですが、ジョブの長さは定期的に36時間を超えています。これは、データ交換のために内部ステージにアクセスするためにコネクタが使用する AWS トークンの最大期間です。

コネクタの古いバージョンを使用している場合、SnowflakeとSparkの間でデータを交換するためにコネクタが使用できるS3の場所を準備する必要があります。

SparkとSnowflakeの間でデータを交換するために使用されるS3バケット/ディレクトリ( tempDir で指定されている)へのアクセスを許可するために、2つの認証方法がサポートされています:

  • 永続的な AWS 認証情報(S3にアクセスするためのHadoop/Spark認証の構成にも使用)

  • 一時的な AWS 認証情報

永続的な AWS 認証情報の使用

これは標準の AWS 認証方法です。 awsAccessKeyawsSecretKey の値のペアが必要です。

注釈

これらの値は、S3にアクセスするためのHadoop/Sparkの構成にも使用する必要があります。例などの詳細については、 S3A または S3Nを使用したHadoop/Sparkの認証 (このトピック)をご参照ください。

例:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_name>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)

sfOptions でサポートされているオプションの詳細については、 外部データ転送の AWS オプション (このトピック)をご参照ください。

S3A または S3Nを使用したHadoop/Sparkの認証

Hadoop/Sparkエコシステムは、S3 にアクセスするための2つの URI スキーム を サポートしています

s3a://

新しい推奨方法(Hadoop 2.7以降の場合)

この方法を使用するには、このトピックのScalaの例を変更して、次のHadoop構成オプションを追加します。

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)

tempdir オプションも s3a:// を使用していることを確認してください。

s3n://

以前の方法(Hadoop 2.6以前の場合)

一部のシステムでは、次のScalaの例に示すように明示的に指定する必要があります:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)

一時的な AWS 認証情報の使用

このメソッドは、コネクタの temporary_aws_access_key_idtemporary_aws_secret_access_key、および temporary_aws_session_token 構成オプションを使用します。

この方法では、データ交換に使用されるS3バケット/ディレクトリへの一時的なアクセスのみをSnowflakeに提供することにより、セキュリティを強化できます。

注釈

一時的な認証情報は、コネクタのS3認証を構成するためにのみ使用できます。Hadoop/Spark認証の構成には使用できません。

また、一時的な認証情報を提供する場合、提供された永続的な認証情報よりも優先されます。

次のScalaコードサンプルは、一時的な認証情報を使用した認証の例を示しています。

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())

sfOptions2options() DataFrame メソッドで使用できるようになりました。

データ交換のためのAzureの認証

このセクションでは、データ交換にAzure Blobストレージを使用する場合の認証方法について説明します。

この方法の認証は、次のいずれかの 状況でのみ 必要です。

  • Snowflake Connector for Sparkバージョンは2.1.x(またはそれ以下)です。v2.2.0以降、コネクタはデータ交換にSnowflake内部の一時ステージを使用します。現在コネクタのバージョン2.2.0(またはそれ以上)を使用していない場合、Snowflakeは最新バージョンにアップグレードすることを強くお勧めします。

  • Snowflake Connector for Sparkバージョンは2.2.0(またはそれ以上)ですが、ジョブの長さは定期的に36時間を超えています。これは、データ交換のために内部ステージにアクセスするためにコネクタが使用するAzureトークンの最大期間です。

コネクタがSnowflakeとSparkの間でデータを交換するために使用できるAzure Blobストレージコンテナを準備する必要があります。

Azure認証情報を使用する

これは、標準のAzure Blobストレージ認証方法です。値のペアが必要です: tempDir ( URL)と temporary_azure_sas_token の値。

注釈

これらの値は、Azure BlobストレージにアクセスするためのHadoop/Sparkの構成にも使用する必要があります。例などの詳細については、 Azureを使用したHadoop/Sparkの認証 (このトピック)をご参照ください。

例:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

// Then, configure your Snowflake environment
//
val sfOptions = Map(
  "sfURL" -> "<account_name>.snowflakecomputing.com",
  "sfUser" -> "<user_name>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database_name>",
  "sfSchema" -> "<schema_name>",
  "sfWarehouse" -> "<warehouse_name>",
  "sfCompress" -> "on",
  "sfSSL" -> "on",
  "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/",
  "temporary_azure_sas_token" -> "<azure_sas>"
)

sfOptions でサポートされているオプションの詳細については、 外部データ転送のAzureオプション (このトピック)をご参照ください。

Azureを使用したHadoop/Sparkの認証

この方法を使用するには、このトピックのScalaの例を変更して、次のHadoop構成オプションを追加します。

val hadoopConf = sc.hadoopConfiguration
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

tempdir オプションも wasb:// を使用していることを確認してください。

ブラウザーを介した認証はサポートされていません

Sparkコネクタを使用する場合、ブラウザウィンドウを開いてユーザーに認証情報を要求する認証形式を使用することは実用的ではありません。ウィンドウは、必ずしもクライアントマシンに表示されるとは限りません。したがって、Sparkコネクタは、ブラウザウィンドウを呼び出す MFA (多要素認証)や SSO (シングルサインオン)などの認証の種類をサポートしていません。