Sparkコネクタの使用

コネクタは標準のSpark APIに準拠していますが、このトピックで説明するSnowflake固有のオプションが追加されています。

このトピックでは、用語 COPY は次の両方を指します:

  • COPY INTO <テーブル> (内部または外部のステージからテーブルにデータを転送するために使用されます)。

  • COPY INTO <場所> (テーブルから内部または外部ステージにデータを転送するために使用されます)。

このトピックの内容:

SnowCD を使用したSnowflakeへのネットワーク接続の検証

ドライバーを設定したら、 SnowCD を使用して、Snowflakeへのネットワーク接続を評価およびトラブルシューティングできます。

初期設定プロセス中にオンデマンドで SnowCD をいつでも使用して、Snowflakeへのネットワーク接続を評価およびトラブルシューティングできます。

プッシュダウン

Sparkコネクターは、 SQL 操作のSpark論理プランをキャプチャおよび分析することにより、述語とクエリプッシュダウンを適用します。データソースがSnowflakeの場合、操作は SQL クエリに変換され、Snowflakeで実行されてパフォーマンスが向上します。

ただし、この変換にはSpark SQL 演算子からSnowflake式へのほぼ1対1の変換が必要であるため、すべてのSpark SQL 演算子をプッシュダウンできるわけではありません。プッシュダウンが失敗すると、コネクタは最適化されていない実行プランにフォールバックします。サポートされていない操作は、代わりにSparkで実行されます。

注釈

すべての操作でプッシュダウンが必要な場合は、代わりに Snowpark API を使用するようにコードを作成することを検討してください。

以下は、プッシュダウンでサポートされている操作のリストです(以下のすべての関数は、Spark名を使用しています)。関数がこのリストにない場合、Snowflakeにプッシュダウンされるのではなく、それを利用するSparkプランがSparkで実行される場合があります。

  • 集計関数

    • Average

    • Corr

    • CovPopulation

    • CovSample

    • Count

    • Max

    • Min

    • StddevPop

    • StddevSamp

    • Sum

    • VariancePop

    • VarianceSamp

  • ブール演算子

    • And

    • Between

    • Contains

    • EndsWith

    • EqualTo

    • GreaterThan

    • GreaterThanOrEqual

    • In

    • IsNull

    • IsNotNull

    • LessThan

    • LessThanOrEqual

    • Not

    • Or

    • StartsWith

  • 日付、時刻、およびタイムスタンプ関数

    • DateAdd

    • DateSub

    • Month

    • Quarter

    • TruncDate

    • TruncTimestamp

    • Year

  • 数学関数

    • 算術演算子「+」(加算)、「-」(減算)、「*」(乗算)、「/」(除算)、および「-」(単項否定)。

    • Abs

    • Acos

    • Asin

    • Atan

    • Ceil

    • CheckOverflow

    • Cos

    • Cosh

    • Exp

    • Floor

    • Greatest

    • Least

    • Log

    • Pi

    • Pow

    • PromotePrecision

    • Rand

    • Round

    • Sin

    • Sinh

    • Sqrt

    • Tan

    • Tanh

  • その他の演算子

    • Alias(AS 式)

    • BitwiseAnd

    • BitwiseNot

    • BitwiseOr

    • BitwiseXor

    • CaseWhen

    • Cast(子, t, _)

    • Coalesce

    • If

    • MakeDecimal

    • ScalarSubquery

    • ShiftLeft

    • ShiftRight

    • SortOrder

    • UnscaledValue

  • 関係演算子

    • 集計関数とgroup-by句

    • Distinct

    • Filters

    • In

    • InSet

    • Joins

    • Limits

    • Projections

    • Sorts(ORDER BY)

    • UnionおよびUnion All

    • ウィンドウ関数とwindowing句

  • 文字列関数

    • Ascii

    • Concat(子)

    • Length

    • Like

    • Lower

    • StringLPad

    • StringRPad

    • StringTranslate

    • StringTrim

    • StringTrimLeft

    • StringTrimRight

    • Substring

    • Upper

  • ウィンドウ関数(注: これらはSpark 2.2では機能しません)

    • DenseRank

    • Rank

    • RowNumber

Scalaでコネクタを使用する

データソースクラス名の指定

SnowflakeをSparkのデータソースとして使用するには、 .format オプションを使用して、データソースを定義するSnowflakeコネクタクラス名を指定します。

net.snowflake.spark.snowflake

クラス名のコンパイル時チェックを確実にするために、Snowflakeはクラス名に変数を定義することを強くお勧めします。例:

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
Copy

また、便宜上 Utils クラスは変数を提供し、次のようにインポートできます。

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
Copy

注釈

このトピックのすべての例では、クラス定義として SNOWFLAKE_SOURCE_NAME を使用しています。

セッションでのプッシュダウンの有効化/無効化

コネクタのバージョン2.1.0(以降)はクエリプッシュダウンをサポートします。これは、SnowflakeがSparkデータソースである場合にクエリ処理をSnowflakeにプッシュすることにより、パフォーマンスを大幅に改善できます。

デフォルトでは、プッシュダウンは有効になっています。

特定の DataFrame のSparkセッション内でプッシュダウンを無効にするには、

  1. SparkSession オブジェクトをインスタンス化した後、 SnowflakeConnectorUtils.disablePushdownSession 静的メソッドを呼び出して SparkSession オブジェクトを渡します。例:

    SnowflakeConnectorUtils.disablePushdownSession(spark)
    
    Copy

    sparkSparkSession オブジェクトです。

  2. 自動プッシュダウン オプションを off に設定して DataFrame を作成します。例:

    val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME)
      .options(sfOptions)
      .option("query", query)
      .option("autopushdown", "off")
      .load()
    
    Copy

    options メソッドに渡す Map に、 autopushdown オプションを設定することもできます(例: 上記の例の sfOptions)。

プッシュダウンを無効にした後で再度有効にするには、 SnowflakeConnectorUtils.enablePushdownSession 静的メソッドを呼び出し(SparkSession オブジェクトを渡す)、 autopushdown を有効にして DataFrame を作成します。

SnowflakeからSparkへのデータの移動

注釈

DataFramesを使用する場合、Snowflakeコネクタは SELECT クエリのみをサポートします。

SnowflakeからSpark DataFrame にデータを読み込むには、

  1. read() メソッド( SqlContext オブジェクトの)を使用して DataFrameReader を構成します。

  2. format() メソッドを使用して、 SNOWFLAKE_SOURCE_NAME を指定します。定義については、 データソースクラス名の指定 (このトピック)をご参照ください。

  3. option() または options() メソッドを使用して、コネクタオプションを指定します。詳細については、 コネクタの構成オプションの設定 (このトピック)をご参照ください。

  4. 読み取るテーブルデータに次のオプションのいずれかを指定します。

    • dbtable :読み取るテーブルの名前。すべての列と記録が取得されます(つまり、 SELECT * FROM db_table と同等)。

    • query :実行する正確なクエリ(SELECT ステートメント)。

使用上の注意

  • 現在、 DataFrames を使用するときに、コネクタは他の種類のクエリ(例: SHOW 、 DESC、または DML ステートメント)をサポートしません。

  • 個々の行のサイズには上限があります。詳細については、 クエリテキストサイズの制限 をご参照ください。

パフォーマンスの考慮事項

SnowflakeとSparkの間でデータを転送する場合、次の方法を使用してパフォーマンスを分析/改善します。

  • net.snowflake.spark.snowflake.Utils.getLastSelect() メソッドを使用して、SnowflakeからSparkにデータを移動するときに発行される実際のクエリを確認します。

  • Spark DataFrameの filter または where 機能を使用する場合、発行された SQL クエリにそれぞれのフィルターが存在することを確認してください。Snowflakeコネクタは、Sparkによって要求されたすべてのフィルターを SQLに変換しようとします。

    ただし、現在のSparkインフラストラクチャがSnowflakeコネクタに渡さないフィルターの形式があります。その結果、状況によっては、Snowflakeから多数の不要なレコードが要求されます。

  • 列のサブセットのみが必要な場合は、 SQL クエリでサブセットが反映されていることを確認してください。

  • 一般に、発行された SQL クエリが DataFrame 操作に基づいて期待するものと一致しない場合、 query オプションを使用して、希望する正確な SQL 構文を提供します。

テーブル全体を読む:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()
Copy

クエリの結果を読み取ります:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()
Copy

SparkからSnowflakeへのデータの移動

DataFrame の内容をSnowflakeのテーブルに保存する手順は、SnowflakeからSparkへの書き込みと類似しています。

  1. DataFramewrite() メソッドを使用して DataFrameWriter を構成します。

  2. format() メソッドを使用して、 SNOWFLAKE_SOURCE_NAME を指定します。定義については、 データソースクラス名の指定 (このトピック)をご参照ください。

  3. option() または options() メソッドを使用して、コネクタオプションを指定します。詳細については、 コネクタの構成オプションの設定 (このトピック)をご参照ください。

  4. データを書き込むテーブルを指定するには、 dbtable オプションを使用します。

  5. コンテンツの保存モードを指定するには、 mode() メソッドを使用します。

    詳細については、 SaveMode (Sparkドキュメント)をご参照ください。

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

SparkからSnowflakeへの JSON のエクスポート

Spark DataFrames には、文字列としてシリアル化された JSON オブジェクトを含めることができます。次のコードは、通常の DataFrame を JSON データを含む DataFrame に変換する例を示しています。

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)
Copy

結果の jsonDataFrame には、タイプ StringType の単一列が含まれています。その結果、この DataFrame が共通の SaveMode.Overwrite モードでSnowflakeにエクスポートされると、新しい表がSnowflakeにタイプ VARCHAR の単一列で作成されます。

jsonDataFrameVARIANT 列にロードするには、

  1. Snowflakeテーブルを作成します(Snowflake JDBC ドライバーを使用してJavaでSnowflakeに接続)。例で使用されている接続パラメーターの説明については、 JDBC ドライバーの接続パラメーター参照 をご参照ください。

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
        String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.com/";
    
        Properties properties = new Properties();
        properties.put("user", "peter");
        properties.put("password", "test");
        properties.put("account", "myorganization-myaccount");
        properties.put("warehouse", "mywh");
        properties.put("db", "mydb");
        properties.put("schema", "public");
    
        // get connection
        System.out.println("Create JDBC connection");
        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
        System.out.println("Done creating JDBC connection\n");
        // create statement
        System.out.println("Create JDBC statement");
        Statement statement = connection.createStatement();
        System.out.println("Done creating JDBC statement\n");
        // create a table
        System.out.println("Create my_variant_table table");
        statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
        statement.close();
        System.out.println("Done creating demo table\n");
    
        connection.close();
        System.out.println("Close connection\n");
      }
    }
    
    Copy
  2. 既存のテーブルを再利用する場合は、 SaveMode.Overwrite の代わりに、 SaveMode.Append を使用します。 JSON を表す文字列値がSnowflakeにロードされると、ターゲット列が VARIANT型であるため、 JSONとして解析されます。例:

    df.write
        .format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "my_variant_table")
        .mode(SaveMode.Append)
        .save()
    
    Copy

DDL/DML SQL ステートメントの実行

クエリに加えて、 runQuery() メソッド( Utils オブジェクトの)を使用して、 DDL/DML SQL ステートメントを実行します。例:

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
    )
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")
Copy

sfOptions は DataFramesの読み書きに使用されるパラメーターマップです。

runQuery メソッドは TRUE または FALSE のみを返します。結果セットを返さないステートメント、例えば DDL ステートメント(CREATE TABLE のような)、および DML ステートメント( INSERTUPDATEDELETE のような)を対象としています。 SELECTSHOW などの結果セットを返すステートメントでは有効ではありません。

タイムスタンプとタイムゾーンの操作

Sparkでは、Scala/Java Timestamp型と同等のタイムスタンプ1種類のみを提供します。Snowflakeの TIMESTAMP_LTZ (現地時間帯)データ型の動作とほぼ同様です。そのため、SparkとSnowflakeの間でデータを転送するときは、タイムゾーンに準じた時間を正しく保つために、次の方法を使用することをお勧めします。

  • Snowflakeでは、 TIMESTAMP_LTZ データ型のみを使用します。

    注釈

    デフォルトのタイム・スタンプ・データ・タイプ・マッピングは TIMESTAMP_NTZ (タイムゾーンなし)であるため、 必ず TIMESTAMP_TYPE_MAPPING パラメーターにより TIMESTAMP_LTZ の使用を明示的に設定します。

  • Sparkのタイムゾーンを UTC に設定し、Snowflakeでこのタイムゾーンを使用します(例:コネクタに sfTimezone オプションを設定しないでください。また、Snowflakeでタイムゾーンを明示的に設定しないでください)。このシナリオでは、 TIMESTAMP_LTZ と TIMESTAMP_NTZ は事実上同等です。

    タイムゾーンを設定するには、Sparkコードに次の行を追加します:

    java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
    
    Copy

これらのアプローチのいずれも実装しない場合、望ましくない時間変更が発生する可能性があります。たとえば、次のシナリオを考えます:

  • Sparkのタイムゾーンは America/New_York に設定されています。

  • Snowflakeのタイムゾーンは Europe/Warsaw に設定されています。これは次のいずれかの方法で発生します。

    • コネクタの sfTimezoneEurope/Warsaw に設定しています。

    • コネクタの sfTimezonesnowflake に設定し、Snowflakeの TIMEZONE セッションパラメータを Europe/Warsaw に設定します。

  • Snowflakeでは TIMESTAMP_NTZ と TIMESTAMP_LTZ の両方が使用されています。

このシナリオでは、

  1. Snowflakeの TIMESTAMP_NTZ 列で 12:00:00 を表す値がSparkに送信された場合、この値にはタイムゾーン情報は含まれません。Sparkはこの値、ニューヨークの 12:00:00 として扱います。

  2. Sparkがこの値 12:00:00 (ニューヨーク)をSnowflakeに送り返して TIMESTAMP_LTZ 列にロードすると、自動的に変換され、 18:00:00 (ワルシャワタイムゾーン用)としてロードされます。

  3. この値がSnowflakeで TIMESTAMP_NTZ に変換されると、ユーザーには 18:00:00 が表示されますが、これは元の値 12:00:00 とは異なります。

要約すると、これらのルールの少なくとも1つを 少なくとも 厳守することをお勧めします。

  • SparkとSnowflakeの両方に、 同じ タイムゾーン、理想的には UTC を使用します。

  • SparkとSnowflakeの間でデータを転送するには、 TIMESTAMP_LTZ データ型 のみ を使用します。

サンプルScalaプログラム

重要

このサンプルプログラムは、バージョン2.2.0(またはそれ以上)のコネクタを使用していることを前提としています。Snowflake内部ステージを使用して一時データを保存するため、一時データの保存にS3の場所は必要ありません。以前のバージョンを使用している場合は、既存のS3の場所があり、 tempdirawsAccessKeyawsSecretKey の値( sfOptions の)を含める必要があります。詳細については、 外部データ転送の AWS オプション (このトピック)をご参照ください。

次のScalaプログラムは、Spark用Snowflakeコネクタの完全な使用例を提供します。コードを使用する前に、 コネクタの構成オプションの設定 (このトピック内)で説明されているように、次の文字列を適切な値に置き換えます。

  • <アカウント識別子>: 使用する アカウント識別子

  • <ユーザー名><パスワード>: Snowflakeユーザーのログイン認証情報。

  • <データベース><スキーマ><ウェアハウス> :Snowflakeセッションのデフォルト。

サンプルのScalaプログラムは、基本認証(つまり、ユーザー名とパスワード)を使用します。 OAuth で認証する場合は、 外部 OAuth の使用(このトピック内)をご参照ください。

import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) from t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Pythonでのコネクタの使用

Pythonでコネクタを使用することは、Scalaの使用法に非常に似ています。

Sparkディストリビューションに含まれている bin/pyspark スクリプトを使用することをお勧めします。

pyspark スクリプトの構成

pyspark スクリプトを spark-shell スクリプトと同様に構成する必要があります( --packages または --jars オプションを使用)。例:

bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3
Copy

Snowflake Sparkコネクタおよび JDBC コネクタ.jarファイルを CLASSPATH 環境変数に含めることを忘れないでください。

spark-shell スクリプトの構成の詳細については、 ステップ4:ローカルSparkクラスターまたはAmazon EMR がホストするSpark環境を構成する をご参照ください。

セッションでのプッシュダウンの有効化/無効化

コネクタのバージョン2.1.0(以降)はクエリプッシュダウンをサポートします。これは、SnowflakeがSparkデータソースである場合にクエリ処理をSnowflakeにプッシュすることにより、パフォーマンスを大幅に改善できます。

デフォルトでは、プッシュダウンは有効になっています。

特定の DataFrame のSparkセッション内でプッシュダウンを無効にするには、

  1. SparkSession オブジェクトをインスタンス化した後、 SnowflakeConnectorUtils.disablePushdownSession 静的メソッドを呼び出して SparkSession オブジェクトを渡します。例:

    sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
    
    Copy
  2. 自動プッシュダウン オプションを off に設定して DataFrame を作成します。例:

    df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sfOptions) \
      .option("query",  query) \
      .option("autopushdown", "off") \
      .load()
    
    Copy

    options メソッドに渡す Dictionary に、 autopushdown オプションを設定することもできます(例: 上記の例の sfOptions)。

プッシュダウンを無効にした後で再度有効にするには、 SnowflakeConnectorUtils.enablePushdownSession 静的メソッドを呼び出し(SparkSession オブジェクトを渡す)、 autopushdown を有効にして DataFrame を作成します。

サンプルPythonスクリプト

重要

このサンプルスクリプトは、バージョン2.2.0(またはそれ以上)のコネクタを使用していることを前提としています。Snowflake内部ステージを使用して一時データを保存するため、このデータの保存にS3の場所は必要ありません。以前のバージョンを使用している場合は、既存のS3の場所があり、 tempdirawsAccessKeyawsSecretKey の値( sfOptions の)を含める必要があります。詳細については、 外部データ転送の AWS オプション (このトピック)をご参照ください。

pyspark スクリプトを構成したら、 SQL クエリおよびその他の操作を実行できます。以下は、単純な SQL クエリを実行するPythonスクリプトの例です。このスクリプトは、基本的なコネクタの使用法を示しています。このドキュメントのScalaの例のほとんどは、Pythonで使用するための最小限の労力/変更で適応できます。

サンプルPythonスクリプトは、基本認証(つまり、ユーザー名とパスワード)を使用します。 OAuth で認証する場合は、 外部 OAuth の使用(このトピック内)をご参照ください。

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()
Copy

ちなみに

sfOptionsSNOWFLAKE_SOURCE_NAME の使用に注意してください。これにより、コードが簡素化され、エラーが発生する可能性が低くなります。

sfOptions でサポートされているオプションの詳細については、 コネクタの構成オプションの設定 (このトピック)をご参照ください。

データ型マッピング

Sparkコネクタは、多くの一般的なデータ型間の変換をサポートしています。

Spark SQL からSnowflakeへ

Spark データ タイプ

Snowflake データ タイプ

ArrayType

VARIANT

BinaryType

サポート対象外

BooleanType

BOOLEAN

ByteType

INTEGER.Snowflakeは BYTE タイプをサポートしていません。

DateType

DATE

DecimalType

DECIMAL

DoubleType

DOUBLE

FloatType

FLOAT

IntegerType

INTEGER

LongType

INTEGER

MapType

VARIANT

ShortType

INTEGER

StringType

長さが指定されている場合 VARCHAR(N)、そうでない場合 VARCHAR

StructType

VARIANT

TimestampType

TIMESTAMP

SnowflakeからSpark SQLまで

Snowflake データ タイプ

Spark データ タイプ

ARRAY

StringType

BIGINT

DecimalType(38, 0)

BINARY

サポート対象外

BLOB

サポート対象外

BOOLEAN

BooleanType

CHAR

StringType

CLOB

StringType

DATE

DateType

DECIMAL

DecimalType

DOUBLE

DoubleType

FLOAT

DoubleType

INTEGER

DecimalType(38, 0)

OBJECT

StringType

TIMESTAMP

TimestampType

TIME

StringType (Sparkコネクタバージョン2.4.14以降)

VARIANT

StringType

DataFrame.showメソッドの呼び出し

DataFrame.show メソッドを呼び出し、 DataFrame の行数よりも少ない数を渡す場合は、並べ替えられた順序で表示する行のみを含む DataFrame を作成します。

これを実行するには、

  1. 最初に sort メソッドを呼び出して、並べ替えられた行を含む DataFrame を返します。

  2. その DataFrame で limit メソッドを呼び出し、表示する行だけを含む DataFrame を返します。

  3. 返された DataFrame で show メソッドを呼び出します。

たとえば、5行を表示し、結果を列 my_col で並べ替える場合は、

val dfWithRowsToShow = originalDf.sort("my_col").limit(5)
dfWithRowsToShow.show(5)
Copy

それ以外では、 show を呼び出して DataFrame の行のサブセットを表示すると、コードの実行が異なる場合に、異なる行が表示される可能性があります。

コネクタの構成オプションの設定

次のセクションでは、コネクタの動作を構成するために設定するオプションのリストを示します。

これらのオプションを設定するには、 Spark DataframeReader クラスの .option(< キー >, < >)、または .options(< マップ >) メソッドを呼び出します。

ちなみに

オプションの使用を容易にするために、単一の Map オブジェクトでオプションを指定し、 .options(< マップ >) を呼び出してオプションを設定することをお勧めします。

必要な接続オプション

Snowflakeに接続するには、次のオプションが必要です。

sfUrl

アカウントの ホスト名 を次の形式で指定します。

account_identifier.snowflakecomputing.com

account_identifier は、 使用する アカウント識別子 です。

sfUser

Snowflakeユーザーのログイン名。

認証には、次のいずれかのオプションも使用する必要があります。

  • sfPassword

    Snowflakeユーザーのパスワード。

  • pem_private_key

    キーペア認証用のシークレットキー( PEM 形式)。手順については、 キーペア認証とキーペアローテーション をご参照ください。

  • sfAuthenticator

    Snowflakeへの認証に 外部 OAuth を使用することを指定します。値を oauth に設定します。

    外部 OAuth を使用するには、 sfToken パラメーターを設定する必要があります。

sfToken

外部 OAuth を使用している場合には、値を外部 OAuth アクセストークンに設定する必要があります。

この接続パラメーターでは、 sfAuthenticator パラメーター値を oauth に設定する必要があります。

デフォルトはありません。

必要なコンテキストオプション

セッションのデータベースとスキーマコンテキストを設定するには、次のオプションが必要です:

sfDatabase

接続後にセッションに使用するデータベース。

sfSchema

接続後にセッションに使用するスキーマ。

その他のコンテキストオプション

このセクションにリストされているオプションは必須ではありません。

sfAccount

アカウント識別子(例: myorganization-myaccount)。アカウント識別子が sfUrl で指定されているため、このオプションは不要になりました。ここでは、下位互換性のためにのみ文書化されています。

sfWarehouse

接続後にセッションに使用するデフォルトの仮想ウェアハウス。

sfRole

接続後にセッションに使用するデフォルトのセキュリティロール。

プロキシオプション

このセクションにリストされているオプションは必須ではありません。

use_proxy

コネクタが、プロキシを使用するかどうかを指定します。

  • true はコネクタがプロキシを使用する必要があることを指定します。

  • false はコネクタがプロキシを使用する必要がないことを指定します。

デフォルト値は false です。

proxy_host

use_proxytrue の場合に必須)使用するプロキシサーバーのホスト名を指定します。

proxy_port

use_proxytrue の場合に必須)使用するプロキシサーバーのポート番号を指定します。

proxy_protocol

プロキシサーバーへの接続に使用するプロトコルを指定します。次の値のいずれかを指定できます。

  • http

  • https

デフォルト値は http です。

これは、AWS のSnowflakeでのみサポートされています。

このオプションは、Sparkコネクタのバージョン2.11.1で追加されました。

proxy_user

プロキシサーバーへの認証用のユーザー名を指定します。プロキシサーバーが認証を必要とする場合はこの設定を行います。

これは、AWS のSnowflakeでのみサポートされています。

proxy_password

プロキシサーバーへの認証用の proxy_user のパスワードを指定します。プロキシサーバーが認証を必要とする場合はこの設定を行います。

これは、AWS のSnowflakeでのみサポートされています。

non_proxy_hosts

プロキシサーバーをバイパスして、コネクタが直接接続する必要があるホストのリストを指定します。

ホスト名は URL をエスケープしたパイプ記号(%7C)で区切ります。アスタリスク(*)をワイルドカードとして使用することもできます。

これは、AWS のSnowflakeでのみサポートされています。

追加オプション

このセクションにリストされているオプションは必須ではありません。

sfTimezone

Sparkでの作業時にSnowflakeが使用するタイムゾーン。パラメーターはSnowflakeのタイムゾーンのみを設定することに注意してください。Spark環境は変更されません。サポートされている値は次のとおりです。

  • spark :Sparkのタイムゾーンを使用します(デフォルト)。

  • snowflake :Snowflakeの現在のタイムゾーンを使用します。

  • sf_default :接続しているSnowflakeユーザーのデフォルトのタイムゾーンを使用します。

  • time_zone: 有効な場合、特定のタイムゾーン(例: America/New_York)を使用します。

    このオプションの設定の影響の詳細については、 タイムスタンプとタイムゾーンの操作 (このトピック内)をご参照ください。

sfCompress

on (デフォルト)に設定すると、SnowflakeとSparkの間で渡されるデータが圧縮されます。

s3MaxFileSize

SnowflakeからSparkにデータを移動するときに使用されるファイルのサイズ。デフォルトは 10MB です。

preactions

SparkとSnowflakeの間でデータが転送される前に実行される SQL コマンドのセミコロン区切りリスト。

SQL コマンドに %s が含まれる場合、 %s は操作で参照されるテーブル名に置き換えられます。

postactions

SparkとSnowflakeの間でデータが転送される後に実行される SQL コマンドのセミコロン区切りリスト。

SQL コマンドに %s が含まれる場合、これは操作で参照されるテーブル名に置き換えられます。

truncate_columns

on (デフォルト)に設定すると、 COPY コマンドはターゲット列の長さを超えるテキスト文字列を自動的に切り捨てます。 off に設定した場合、ロードされた文字列がターゲット列の長さを超えると、コマンドはエラーを生成します。

truncate_table

このパラメーターは、そのテーブルを上書きするときに、SnowflakeがSnowflakeターゲットテーブルのスキーマを保持するかどうかを制御します。

デフォルトでは、Snowflakeのターゲットテーブルが上書きされると、そのターゲットテーブルのスキーマも上書きされます。新しいスキーマは、ソーステーブル(Sparkデータフレーム)のスキーマに基づいています。

ただし、ソースのスキーマが理想的でない場合があります。例えば、ユーザーは、最初のソース列のデータ型が INTEGERであっても、Snowflakeターゲットテーブルが将来 FLOAT 値を格納できるようにしたい場合があります。その場合は、Snowflakeテーブルのスキーマを上書きしないでください。Snowflakeテーブルは単に切り捨ててから、現在のスキーマで再利用する必要があります。

このパラメーターの可能な値は次のとおりです:

  • on

  • off

このパラメーターが on の場合、ターゲットテーブルの元のスキーマが保持されます。このパラメーターが off の場合、テーブルの古いスキーマは無視され、ソースのスキーマに基づいて新しいスキーマが生成されます。

このパラメーターはオプションです。

このパラメーターのデフォルト値は off です(つまり、デフォルトでは元のテーブルスキーマが上書きされます)。

Sparkデータ型からSnowflakeデータ型へのマッピング(およびその逆)の詳細については、 データ型マッピング (このトピック)をご参照ください。

continue_on_error

この変数は、ユーザーが無効なデータ(例えば、バリアントデータ型列の無効な JSON 形式)を入力した場合に COPY コマンドを中止するかどうかを制御します。

可能な値は次のとおりです:

  • on

  • off

on は、エラーが発生しても続行することを意味します。値 off は、エラーが発生した場合に中止することを意味します。

このパラメーターはオプションです。

このパラメーターのデフォルト値は off です。

このオプションをオンにすることはお勧めしません。Sparkコネクタを使用して COPYing からSnowflakeにエラーが報告された場合、データが失われる可能性があります。

注釈

行が拒否されたまたは欠落している場合、入力ソースでそれらの行に明らかに欠陥がない場合は、Snowflakeに報告してください。

usestagingtable

このパラメーターは、データのロードでステージングテーブルを使用するかどうかを制御します。

ステージングテーブルは、コネクタによって作成される通常のテーブル(一時的な名前を持つ)です。データのロード操作が成功すると、元のターゲットテーブルが削除され、ステージングテーブルの名前が元のターゲットテーブルの名前に変更されます。データのロード操作が失敗した場合、ステージングテーブルは削除され、ターゲットテーブルには操作の直前に保持していたデータが残ります。したがって、ステージングテーブルを使用すると、操作が失敗した場合に元のターゲットテーブルデータを保持できます。安全のため、Snowflakeはほとんどの状況でステージングテーブルを使用することを強くお勧めします。

コネクタがステージングテーブルを作成するには、Sparkコネクタを介して COPY を実行するユーザーに、テーブルを作成するための十分な権限が必要です。ユーザーがテーブルを作成する権限を持っていない場合、直接ロード(つまり、ステージングテーブルを使用しない読み込み)は便利です。

このパラメーターの可能な値は次のとおりです:

  • on

  • off

パラメーターが on の場合、ステージングテーブルが使用されます。このパラメーターが off の場合、データはターゲットテーブルに直接ロードされます。

このパラメーターはオプションです。

このパラメーターのデフォルト値は on です(つまり、ステージングテーブルを使用します)。

autopushdown

このパラメーターは、自動クエリプッシュダウンを有効にするかどうかを制御します。

プッシュダウンが有効になっている場合、クエリがSparkで実行されるときに、クエリの一部をSnowflakeサーバーに「プッシュダウン」できる場合、プッシュダウンされます。これにより、一部のクエリのパフォーマンスが向上します。

このパラメーターはオプションです。

コネクタがSparkの互換バージョンに接続されている場合、デフォルト値は on です。それ以外の場合、デフォルト値は off です。

コネクタが意図されているものとは異なるバージョンのSparkに接続されている場合(例: コネクタのバージョン3.2がSparkのバージョン3.3に接続されている場合)、このパラメーターが on に設定されていても自動プッシュダウンは無効になります。

purge

これが on に設定されている場合、コネクタは外部データ転送を介してSparkからSnowflakeに転送するときに作成された一時ファイルを削除します。このパラメーターが off に設定されている場合、これらのファイルはコネクターによって自動的に削除されません。

パージは、SparkからSnowflakeへの転送に対してのみ機能し、SnowflakeからSparkへの転送には機能しません。

可能な値は次のとおりです

  • on

  • off

デフォルト値は off です。

columnmap

このパラメーターは、SparkからSnowflakeにデータを書き込み、Snowflakeテーブルの列名がSparkテーブルの列名と一致しない場合に役立ちます。各Snowflake宛先列に対応するSparkソース列を示すマップを作成できます。

パラメータは、次の形式の単一の文字列リテラルです。

"Map(col_2 -> col_b, col_3 -> col_a)"

たとえば、次のシナリオを考えます:

  • Sparkの df という名前のデータフレームには3つの列があります:

    col_1col_2col_3

  • Snowflakeの tb という名前のテーブルには2つの列があります:

    col_acol_b

  • 次の値をコピーします:

    • df.col_2 から tb.col_b に。

    • df.col_3 から tb.col_a に。

columnmap パラメーターの値は次のとおりです。

Map(col_2 -> col_b, col_3 -> col_a)

次のScalaコードを実行して、この値を生成できます。

Map("col_2"->"col_b","col_3"->"col_a").toString()

このパラメーターのデフォルト値はnullです。つまり、デフォルトでは、ソース表と宛先表の列名は一致する必要があります。

このパラメーターは、SparkからSnowflakeに書き込むときにのみ使用されます。SnowflakeからSparkに書き込む場合は適用されません。

keep_column_case

SparkからSnowflakeにテーブルを書き込む場合、Sparkコネクタはデフォルトで、列名が二重引用符で囲まれていない限り、列名の文字を大文字にシフトします。

SnowflakeからSparkにテーブルを書き込む場合、Sparkコネクタはデフォルトで、大文字、アンダースコア、数字を除くすべての文字を含む列名を二重引用符で囲みます。

keep_column_caseを on に設定すると、Sparkコネクタはこれらの変更を行いません。

可能な値は次のとおりです:

  • on

  • off

デフォルト値は off です。

column_mapping

コネクタは、Sparkデータフレームの列をSnowflakeテーブルにマップする必要があります。これは、列名に基づいて(順序に関係なく)、または列の順序に基づいて実行できます(つまり、データフレームの最初の列は、列名に関係なく、テーブルの最初の列にマップされます)。

デフォルトでは、マッピングは順序に基づいて行われます。このパラメーターを name に設定することでオーバーライドできます。これは、コネクタに列名に基づいて列をマップするよう指示します。(名前のマッピングでは大文字と小文字は区別されません。)

このパラメーターの可能な値は次のとおりです:

  • order

  • name

デフォルト値は order です。

column_mismatch_behavior

このパラメーターは、 column_mapping パラメーターが name に設定されている場合にのみ適用されます。

Sparkデータフレームの列名とSnowflakeテーブルが一致しない場合、

  • column_mismatch_behaviorerror の場合、Sparkコネクタはエラーを報告します。

  • column_mismatch_behaviorignore の場合、Sparkコネクタはエラーを無視します。

    • ドライバーは、Snowflakeテーブルに対応する列がないSparkデータフレームの列をすべて破棄します。

    • ドライバーは、Sparkデータフレームに対応する列がないSnowflakeテーブルの列に NULLs を挿入します。

潜在的なエラーは次のとおりです:

  • Sparkデータフレームには、大文字と小文字を除いて同一の列を含めることができます。列名のマッピングでは大文字と小文字が区別されないため、データフレームからテーブルへの正しいマッピングを決定することはできません。

  • Snowflakeテーブルには、大文字と小文字を除いて同一の列を含めることができます。列名のマッピングでは大文字と小文字が区別されないため、データフレームからテーブルへの正しいマッピングを決定することはできません。

  • SparkデータフレームとSnowflakeテーブルには、共通の列名がない場合があります。理論的には、Sparkコネクタはすべての行のすべての列に NULLs を挿入できますが、これは通常無意味であるため、 column_mismatch_behaviorignore に設定されていてもコネクタはエラーをスローします。

このパラメーターの可能な値は次のとおりです:

  • error

  • ignore

デフォルト値は error です。

time_output_format

このパラメーターを使用すると、ユーザーは、返される TIME データの形式を指定できます。

このパラメーターの可能な値は、 時刻形式 で指定された時刻形式の可能な値です。

このパラメーターは出力のみに影響し、入力には影響しません。

timestamp_ntz_output_format. timestamp_ltz_output_format. timestamp_tz_output_format

これらのオプションは、タイムスタンプ値の出力形式を指定します。これらのオプションのデフォルト値は次のとおりです。

構成オプション

デフォルト値

timestamp_ntz_output_format

"YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_ltz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

timestamp_tz_output_format

"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"

これらのオプションが "sf_current" に設定されている場合、コネクタはセッションに指定された形式を使用します。

partition_size_in_mb

このパラメーターは、クエリ結果セットが非常に大きく、複数の DataFrame パーティションに分割する必要がある場合に使用されます。このパラメーターは、各 DataFrame パーティションの推奨される非圧縮サイズを指定します。パーティションの数を減らすには、このサイズを大きくします。

このサイズは推奨サイズとして使用されます。パーティションの実際のサイズはこれより小さくても大きくてもかまいません。

このオプションは、 use_copy_unload パラメーターが FALSE の場合にのみ適用されます。

このパラメーターはオプションです。

デフォルト値は 100 (MB)です。

use_copy_unload

これが FALSE の場合、Snowflakeは SELECTing データのときにArrowデータ形式を使用します。これが TRUE に設定されている場合、Snowflakeは、選択したデータを送信するために COPY UNLOAD コマンドを使用する古い動作に戻ります。

このパラメーターはオプションです。

デフォルト値は FALSE です。

treat_decimal_as_long

TRUE の場合、型 Decimal(precision, 0) を返すクエリに対して(BigDecimal 値ではなく) Long 値を返すようにSparkコネクタを構成します。

デフォルト値は FALSE です。

このオプションは、Sparkコネクタのバージョン2.11.1で追加されました。

s3_stage_vpce_dns_name

内部ステージにアクセスするための VPC エンドポイントの DNS 名を指定します。

このオプションは、Sparkコネクタのバージョン2.11.1で追加されました。

support_share_connection

FALSE の場合は、Sparkコネクタを構成し、Snowflakeにアクセスするために同じSparkコネクタオプションを使用する各ジョブまたはアクションに対して新しい JDBC 接続を作成します。

デフォルト値は TRUE です。これは、異なるジョブやアクションが同じSparkコネクタオプションを使用してSnowflakeにアクセスする場合に、同じ JDBC 接続を共有することを意味します。

プログラムを使用してこの設定を有効または無効にする必要がある場合は、次のstaticグローバル関数を使用します。

  • SparkConnectorContext.disableSharedConnection()

  • SparkConnectorContext.enableSharingJDBCConnection()

注釈

次の特別なケースでは、Sparkコネクタは共有の JDBC 接続を使用しません。

  • 準備処理または事後処理が設定され、それらの準備処理または事後処理が CREATE TABLE、 DROP TABLE、または MERGE INTO ではない場合、Sparkコネクタは共有接続を使用しません。

  • Utils.runQuery()Utils.getJDBCConnection() などのUtilsのユーティリティ関数は、共有接続を使用しません。

このオプションは、Sparkコネクタのバージョン2.11.2で追加されました。

force_skip_pre_post_action_check_for_shared_session

TRUE の場合は、セッション共有のための準備処理と事後処理の検証を無効にするようにSparkコネクタを構成します。

デフォルト値は FALSE です。

重要

このオプションを設定する前に、準備処理と事後処理のクエリがセッションの設定に影響を与えないことを確認してください。そうしないと、結果に問題が発生する可能性があります。

このオプションは、Sparkコネクタのバージョン2.11.3で追加されました。

キーペア認証およびキーペアローテーションの使用

Sparkコネクタは、キーペア認証とキーローテーションをサポートしています。

  1. 開始するには、 キーペア認証とキーペアローテーション に示すように、キーペア認証の初期構成を完了します。

  2. pem_private_key 接続オプションを使用して、秘密キーの 非暗号化 コピーを送信します。

注意

セキュリティ上の理由から、アプリケーションで pem_private_key をハードコーディングするのではなく、安全なソースからキーを読み取った後、パラメーターを動的に設定する必要があります。キーが暗号化されている場合は、キーを復号化し、復号化したバージョンを送信します。

Pythonの例では、 pem_private_key ファイル、 rsa_key.p8 は次のとおりです。

  • 環境変数 PRIVATE_KEY_PASSPHRASE を使用して、パスワードで保護されたファイルから直接読み取られます。

  • sfOptions 文字列で式 pkb を使用します。

接続するには、Pythonの例をファイル(つまり、 <file.py>)に保存してから、次のコマンドを実行します。

spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>
Copy

Python

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os

with open("<path>/rsa_key.p8", "rb") as key_file:
  p_key = serialization.load_pem_private_key(
    key_file.read(),
    password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
    backend = default_backend()
    )

pkb = p_key.private_bytes(
  encoding = serialization.Encoding.PEM,
  format = serialization.PrivateFormat.PKCS8,
  encryption_algorithm = serialization.NoEncryption()
  )

pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')

sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "pem_private_key" : pkb,
  "sfDatabase" : "<database>",
  "sfSchema" : "schema",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "COLORS") \
    .load()

df.show()
Copy

外部 OAuth の使用

Sparkコネクタバージョン2.7.0以降からサンプルScalaプログラムまたはサンプルPythonスクリプトを使用したSnowflakeへの認証に、 外部 OAuth を使用することができます。

外部 OAuth とSparkコネクタを使用してSnowflakeを認証する前に、サポートされている外部 OAuth 認証サーバーの1つ、または外部 OAuth カスタムクライアント の外部 OAuth セキュリティ統合を構成します。

ScalaおよびPythonの例では、 sfPassword パラメーターが、 sfAuthenticator および sfToken パラメーターに置き換えられていることに注意してください。

Scala:

// spark connector version

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.DataFrame

var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<username>",
    "sfAuthenticator" -> "oauth",
    "sfToken" -> "<external_oauth_access_token>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "region")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()
Copy

Python:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")

# Set options below
sfOptions = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfAuthenticator" : "oauth",
  "sfToken" : "<external_oauth_access_token>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()
Copy

外部データ転送の AWS オプション

これらのオプションは、一時データが保存されるAmazon S3ロケーションを指定し、そのロケーションにアクセスするための認証の詳細を提供するために使用されます。これらは外部データ転送を行う場合 のみ 必要です。次のいずれかに該当する場合、外部データ転送が必要です:

  • バージョン2.1.x以下のSparkコネクタ(内部転送をサポートしない)を使用している、または

  • 転送には36時間以上かかる可能性がある場合(内部転送では、36時間後に期限が切れる一時的な認証情報が使用されます)。

tempDir

中間データが保存されるS3の場所(例: s3n://xy12345-bucket/spark-snowflake-tmp/)。

tempDir が指定されている場合、次のいずれかも指定する必要があります。

  • awsAccessKeyawsSecretKey . または

  • temporary_aws_access_key_idtemporary_aws_secret_access_keytemporary_aws_session_token

awsAccessKeyawsSecretKey

これらは、 tempDir で指定された場所へのアクセスを許可する標準の AWS 認証情報です。これらのオプションは両方とも一緒に設定する必要があることに注意してください。

設定されている場合、既存の SparkContext オブジェクトから取得できます。

これらの変数を指定する場合、 tempDir も指定する必要があります。

これらの認証情報は、Hadoopクラスターにも設定する必要があります。

temporary_aws_access_key_idtemporary_aws_secret_access_keytemporary_aws_session_token

これらは、 tempDir で指定された場所へのアクセスを許可する一時的な AWS 認証情報です。これらのオプションは3つとも一緒に設定する必要があることに注意してください。

また、これらのオプションが設定されている場合、 awsAccessKey および awsSecretKey オプションよりも優先されます。

temporary_aws_access_key_idtemporary_aws_secret_access_key、および temporary_aws_session_token を指定する場合、 tempDir も指定する必要があります。それ以外の場合、これらのパラメーターは無視されます。

check_bucket_configuration

on (デフォルト)に設定されている場合、コネクタはデータ転送に使用されるバケットにライフサイクルポリシーが構成されているかどうかを確認します(詳細については、 AWS 外部S3バケットの準備 を参照)。ライフサイクルポリシーが存在しない場合、警告がログに記録されます。

off に設定して)このオプションを無効にすると、このチェックはスキップされます。これは、ユーザーがバケットライフサイクルポリシーではなくバケットデータ操作にアクセスできる場合に役立ちます。このオプションを無効にすると、クエリの実行時間をわずかに短縮できます。

詳細については、 データ交換のためのS3の認証 (このトピック)をご参照ください。

外部データ転送のAzureオプション

このセクションでは、外部データ転送を行うときにAzure Blobストレージに適用されるパラメーターについて説明します。次のいずれかに該当する場合、外部データ転送が必要です。

  • バージョン2.1.x以下のSparkコネクタ(内部転送をサポートしない)を使用している、または

  • 転送には36時間以上かかる可能性がある場合(内部転送では、36時間後に期限が切れる一時的な認証情報が使用されます)。

Azure Blobストレージで外部転送を使用する場合、以下で説明するパラメーターを使用して、Azureコンテナの場所とそのコンテナの SAS (共有アクセス署名)を指定します。

tempDir

中間データが保存されるAzure Blobストレージコンテナ。これは URLの形式で、たとえば次のとおりです。

wasb://<azureコンテナ>@<azureアカウント>.<azureエンドポイント>/

temporary_azure_sas_token

Azure Blobストレージの SAS トークンを指定します。

詳細については、このトピックの データ交換のためのAzureの認証 をご参照ください。

Sparkの一時ストレージのAzure情報を指定する

Azure Blobストレージを使用して一時ストレージを提供し、SparkとSnowflakeの間でデータを転送する場合、SparkとSnowflake Sparkコネクタに一時ストレージの場所と認証情報を提供する必要があります。

Sparkに一時的な保存場所を提供するには、Sparkクラスターで次のようなコマンドを実行します。

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Copy

最後のコマンドには次の変数が含まれていることに注意してください。

  • <コンテナ> および <アカウント> :Azureデプロイメントのコンテナとアカウント名です。

  • <Azureエンドポイント> :Azureデプロイメントの場所のエンドポイントです。たとえば、Azure US デプロイメントを使用している場合、エンドポイントは blob.core.windows.net である可能性があります。

  • <Azure SAS> :共有アクセス署名セキュリティトークンです。

これらの各変数をAzure BLOBストレージアカウントの適切な情報に置き換えます。

Snowflakeセッションパラメータをコネクタのオプションとして渡す

Spark用Snowflakeコネクタは、Snowflakeへの任意のセッションレベルのパラメータの送信をサポートしています(詳細については、 セッションパラメーター をご参照ください)。これは、 options オブジェクトに ("<キー>" -> "<値>") ペアを追加することで実現できます。 <キー> はセッションパラメーター名で、 <値> は値です。

注釈

<値> は、数値またはブール値(例: "1" または "true")を受け入れるパラメーターであっても、二重引用符で囲まれた文字列でなければなりません。

たとえば、次のコードサンプルは、 "false" の値を持つ USE_CACHED_RESULT セッションパラメーターを渡します。これにより、以前に実行されたクエリの結果の使用が無効になります。

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method
Copy

セキュリティに関する考慮事項

お客様は、マルチノードのSparkシステムでは、ノード間の通信が安全であることを確認する必要があります。SparkマスターはSnowflake認証情報をSparkワーカーに送信し、それらのワーカーがSnowflakeステージにアクセスできるようにします。SparkマスターとSparkワーカー間の通信が安全でない場合、認証情報は不正な第三者によって読み取られる可能性があります。

データ交換のためのS3の認証

このセクションでは、データ交換にS3を使用する場合の認証方法について説明します。

このタスクは、次のいずれかの状況で のみ 必要です:

  • Snowflake Connector for Sparkバージョンは2.1.x(またはそれ以下)です。v2.2.0以降、コネクタはデータ交換にSnowflake内部の一時ステージを使用します。現在コネクタのバージョン2.2.0(またはそれ以上)を使用していない場合、Snowflakeは最新バージョンにアップグレードすることを強くお勧めします。

  • Snowflake Connector for Sparkバージョンは2.2.0(またはそれ以上)ですが、ジョブの長さは定期的に36時間を超えています。これは、データ交換のために内部ステージにアクセスするためにコネクタが使用する AWS トークンの最大期間です。

コネクタの古いバージョンを使用している場合、SnowflakeとSparkの間でデータを交換するためにコネクタが使用できるS3の場所を準備する必要があります。

SparkとSnowflakeの間でデータを交換するために使用されるS3バケット/ディレクトリ( tempDir で指定されている)へのアクセスを許可するために、2つの認証方法がサポートされています:

  • 永続的な AWS 認証情報(S3にアクセスするためのHadoop/Spark認証の構成にも使用)

  • 一時的な AWS 認証情報

永続的な AWS 認証情報の使用

これは標準の AWS 認証方法です。 awsAccessKeyawsSecretKey の値のペアが必要です。

注釈

これらの値は、S3にアクセスするためのHadoop/Sparkの構成にも使用する必要があります。例などの詳細については、 S3A または S3Nを使用したHadoop/Sparkの認証 (このトピック)をご参照ください。

例:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)
Copy

sfOptions でサポートされているオプションの詳細については、 外部データ転送の AWS オプション (このトピック)をご参照ください。

S3A または S3Nを使用したHadoop/Sparkの認証

Hadoop/Sparkエコシステムは、S3 にアクセスするための2つの URI スキーム を サポートしています

s3a://

新しい推奨方法(Hadoop 2.7以降の場合)

この方法を使用するには、このトピックのScalaの例を変更して、次のHadoop構成オプションを追加します。

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)
Copy

tempdir オプションも s3a:// を使用していることを確認してください。

s3n://

以前の方法(Hadoop 2.6以前の場合)

一部のシステムでは、次のScalaの例に示すように明示的に指定する必要があります:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)
Copy

一時的な AWS 認証情報の使用

このメソッドは、コネクタの temporary_aws_access_key_idtemporary_aws_secret_access_key、および temporary_aws_session_token 構成オプションを使用します。

この方法では、データ交換に使用されるS3バケット/ディレクトリへの一時的なアクセスのみをSnowflakeに提供することにより、セキュリティを強化できます。

注釈

一時的な認証情報は、コネクタのS3認証を構成するためにのみ使用できます。Hadoop/Spark認証の構成には使用できません。

また、仮の認証情報を提供する場合、提供された永続的な認証情報よりも優先されます。

次のScalaコードサンプルは、一時的な認証情報を使用した認証の例を示しています。

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())
Copy

sfOptions2options() DataFrame メソッドで使用できるようになりました。

データ交換のためのAzureの認証

このセクションでは、データ交換にAzure Blobストレージを使用する場合の認証方法について説明します。

この方法の認証は、次のいずれかの 状況でのみ 必要です。

  • Snowflake Connector for Sparkバージョンは2.1.x(またはそれ以下)です。v2.2.0以降、コネクタはデータ交換にSnowflake内部の一時ステージを使用します。現在コネクタのバージョン2.2.0(またはそれ以上)を使用していない場合、Snowflakeは最新バージョンにアップグレードすることを強くお勧めします。

  • Snowflake Connector for Sparkバージョンは2.2.0(またはそれ以上)ですが、ジョブの長さは定期的に36時間を超えています。これは、データ交換のために内部ステージにアクセスするためにコネクタが使用するAzureトークンの最大期間です。

コネクタがSnowflakeとSparkの間でデータを交換するために使用できるAzure Blobストレージコンテナを準備する必要があります。

Azure認証情報を使用する

これは、標準のAzure Blobストレージ認証方法です。値のペアが必要です: tempDir ( URL)と temporary_azure_sas_token の値。

注釈

これらの値は、Azure BlobストレージにアクセスするためのHadoop/Sparkの構成にも使用する必要があります。例などの詳細については、 Azureを使用したHadoop/Sparkの認証 (このトピック)をご参照ください。

例:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)

// Then, configure your Snowflake environment
//
val sfOptions = Map(
  "sfURL" -> "<account_identifier>.snowflakecomputing.com",
  "sfUser" -> "<user_name>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database_name>",
  "sfSchema" -> "<schema_name>",
  "sfWarehouse" -> "<warehouse_name>",
  "sfCompress" -> "on",
  "sfSSL" -> "on",
  "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/",
  "temporary_azure_sas_token" -> "<azure_sas>"
)
Copy

sfOptions でサポートされているオプションの詳細については、 外部データ転送のAzureオプション (このトピック)をご参照ください。

Azureを使用したHadoop/Sparkの認証

この方法を使用するには、このトピックのScalaの例を変更して、次のHadoop構成オプションを追加します。

val hadoopConf = sc.hadoopConfiguration
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
Copy

tempdir オプションも wasb:// を使用していることを確認してください。

ブラウザーを介した認証はサポートされていません

Sparkコネクタを使用する場合、ブラウザウィンドウを開いてユーザーに認証情報を要求する認証形式を使用することは実用的ではありません。ウィンドウは、必ずしもクライアントマシンに表示されるとは限りません。したがって、Sparkコネクタは、ブラウザウィンドウを呼び出す MFA (多要素認証)や SSO (シングルサインオン)などの認証の種類をサポートしていません。