Sparkコネクタの使用¶
コネクタは標準のSpark APIに準拠していますが、このトピックで説明するSnowflake固有のオプションが追加されています。
このトピックでは、用語 COPY は次の両方を指します:
COPY INTO <テーブル> (内部または外部のステージからテーブルにデータを転送するために使用されます)。
COPY INTO <場所> (テーブルから内部または外部ステージにデータを転送するために使用されます)。
このトピックの内容:
SnowCD を使用したSnowflakeへのネットワーク接続の検証¶
ドライバーを設定したら、 SnowCD を使用して、Snowflakeへのネットワーク接続を評価およびトラブルシューティングできます。
初期設定プロセス中にオンデマンドで SnowCD をいつでも使用して、Snowflakeへのネットワーク接続を評価およびトラブルシューティングできます。
プッシュダウン¶
Sparkコネクターは、 SQL 操作のSpark論理プランをキャプチャおよび分析することにより、述語とクエリプッシュダウンを適用します。データソースがSnowflakeの場合、操作は SQL クエリに変換され、Snowflakeで実行されてパフォーマンスが向上します。
ただし、この変換にはSpark SQL 演算子からSnowflake式へのほぼ1対1の変換が必要であるため、すべてのSpark SQL 演算子をプッシュダウンできるわけではありません。プッシュダウンが失敗すると、コネクタは最適化されていない実行プランにフォールバックします。サポートされていない操作は、代わりにSparkで実行されます。
注釈
すべての操作でプッシュダウンが必要な場合は、代わりに Snowpark API を使用するようにコードを作成することを検討してください。
以下は、プッシュダウンでサポートされている操作のリストです(以下のすべての関数は、Spark名を使用しています)。関数がこのリストにない場合、Snowflakeにプッシュダウンされるのではなく、それを利用するSparkプランがSparkで実行される場合があります。
集計関数
Average
Corr
CovPopulation
CovSample
Count
Max
Min
StddevPop
StddevSamp
Sum
VariancePop
VarianceSamp
ブール演算子
And
Between
Contains
EndsWith
EqualTo
GreaterThan
GreaterThanOrEqual
In
IsNull
IsNotNull
LessThan
LessThanOrEqual
Not
Or
StartsWith
日付、時刻、およびタイムスタンプ関数
DateAdd
DateSub
Month
Quarter
TruncDate
TruncTimestamp
Year
数学関数
算術演算子「+」(加算)、「-」(減算)、「*」(乗算)、「/」(除算)、および「-」(単項否定)。
Abs
Acos
Asin
Atan
Ceil
CheckOverflow
Cos
Cosh
Exp
Floor
Greatest
Least
Log
Pi
Pow
PromotePrecision
Rand
Round
Sin
Sinh
Sqrt
Tan
Tanh
その他の演算子
Alias(AS 式)
BitwiseAnd
BitwiseNot
BitwiseOr
BitwiseXor
CaseWhen
Cast(子, t, _)
Coalesce
If
MakeDecimal
ScalarSubquery
ShiftLeft
ShiftRight
SortOrder
UnscaledValue
関係演算子
集計関数とgroup-by句
Distinct
Filters
In
InSet
Joins
Limits
Projections
Sorts(ORDER BY)
UnionおよびUnion All
ウィンドウ関数とwindowing句
文字列関数
Ascii
Concat(子)
Length
Like
Lower
StringLPad
StringRPad
StringTranslate
StringTrim
StringTrimLeft
StringTrimRight
Substring
Upper
ウィンドウ関数(注: これらはSpark 2.2では機能しません)
DenseRank
Rank
RowNumber
Scalaでコネクタを使用する¶
データソースクラス名の指定¶
SnowflakeをSparkのデータソースとして使用するには、 .format
オプションを使用して、データソースを定義するSnowflakeコネクタクラス名を指定します。
net.snowflake.spark.snowflake
クラス名のコンパイル時チェックを確実にするために、Snowflakeはクラス名に変数を定義することを強くお勧めします。例:
val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
また、便宜上 Utils
クラスは変数を提供し、次のようにインポートできます。
import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
注釈
このトピックのすべての例では、クラス定義として SNOWFLAKE_SOURCE_NAME
を使用しています。
セッションでのプッシュダウンの有効化/無効化¶
コネクタのバージョン2.1.0(以降)はクエリプッシュダウンをサポートします。これは、SnowflakeがSparkデータソースである場合にクエリ処理をSnowflakeにプッシュすることにより、パフォーマンスを大幅に改善できます。
デフォルトでは、プッシュダウンは有効になっています。
特定の DataFrame のSparkセッション内でプッシュダウンを無効にするには、
SparkSession
オブジェクトをインスタンス化した後、SnowflakeConnectorUtils.disablePushdownSession
静的メソッドを呼び出してSparkSession
オブジェクトを渡します。例:SnowflakeConnectorUtils.disablePushdownSession(spark)
spark
はSparkSession
オブジェクトです。自動プッシュダウン オプションを
off
に設定して DataFrame を作成します。例:val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", query) .option("autopushdown", "off") .load()
options
メソッドに渡すMap
に、autopushdown
オプションを設定することもできます(例: 上記の例のsfOptions
)。
プッシュダウンを無効にした後で再度有効にするには、 SnowflakeConnectorUtils.enablePushdownSession
静的メソッドを呼び出し(SparkSession
オブジェクトを渡す)、 autopushdown
を有効にして DataFrame を作成します。
SnowflakeからSparkへのデータの移動¶
注釈
DataFramesを使用する場合、Snowflakeコネクタは SELECT クエリのみをサポートします。
SnowflakeからSpark DataFrame にデータを読み込むには、
read()
メソッド(SqlContext
オブジェクトの)を使用してDataFrameReader
を構成します。format()
メソッドを使用して、SNOWFLAKE_SOURCE_NAME
を指定します。定義については、 データソースクラス名の指定 (このトピック)をご参照ください。option()
またはoptions()
メソッドを使用して、コネクタオプションを指定します。詳細については、 コネクタの構成オプションの設定 (このトピック)をご参照ください。読み取るテーブルデータに次のオプションのいずれかを指定します。
dbtable
:読み取るテーブルの名前。すべての列と記録が取得されます(つまり、SELECT * FROM db_table
と同等)。query
:実行する正確なクエリ(SELECT ステートメント)。
使用上の注意¶
現在、 DataFrames を使用するときに、コネクタは他の種類のクエリ(例: SHOW 、 DESC、または DML ステートメント)をサポートしません。
個々の行のサイズには上限があります。詳細については、 クエリテキストサイズの制限 をご参照ください。
パフォーマンスの考慮事項¶
SnowflakeとSparkの間でデータを転送する場合、次の方法を使用してパフォーマンスを分析/改善します。
net.snowflake.spark.snowflake.Utils.getLastSelect()
メソッドを使用して、SnowflakeからSparkにデータを移動するときに発行される実際のクエリを確認します。Spark DataFrameの
filter
またはwhere
機能を使用する場合、発行された SQL クエリにそれぞれのフィルターが存在することを確認してください。Snowflakeコネクタは、Sparkによって要求されたすべてのフィルターを SQLに変換しようとします。ただし、現在のSparkインフラストラクチャがSnowflakeコネクタに渡さないフィルターの形式があります。その結果、状況によっては、Snowflakeから多数の不要なレコードが要求されます。
列のサブセットのみが必要な場合は、 SQL クエリでサブセットが反映されていることを確認してください。
一般に、発行された SQL クエリが DataFrame 操作に基づいて期待するものと一致しない場合、
query
オプションを使用して、希望する正確な SQL 構文を提供します。
例¶
テーブル全体を読む:
val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t1") .load()
クエリの結果を読み取ります:
val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1") .load()
SparkからSnowflakeへのデータの移動¶
DataFrame の内容をSnowflakeのテーブルに保存する手順は、SnowflakeからSparkへの書き込みと類似しています。
DataFrame
のwrite()
メソッドを使用してDataFrameWriter
を構成します。format()
メソッドを使用して、SNOWFLAKE_SOURCE_NAME
を指定します。定義については、 データソースクラス名の指定 (このトピック)をご参照ください。option()
またはoptions()
メソッドを使用して、コネクタオプションを指定します。詳細については、 コネクタの構成オプションの設定 (このトピック)をご参照ください。データを書き込むテーブルを指定するには、
dbtable
オプションを使用します。コンテンツの保存モードを指定するには、
mode()
メソッドを使用します。詳細については、 SaveMode (Sparkドキュメント)をご参照ください。
例¶
df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t2") .mode(SaveMode.Overwrite) .save()
SparkからSnowflakeへの JSON のエクスポート¶
Spark DataFrames には、文字列としてシリアル化された JSON オブジェクトを含めることができます。次のコードは、通常の DataFrame を JSON データを含む DataFrame に変換する例を示しています。
val rdd = myDataFrame.toJSON val schema = new StructType(Array(StructField("JSON", StringType))) val jsonDataFrame = sqlContext.createDataFrame( rdd.map(s => Row(s)), schema)
結果の jsonDataFrame
には、タイプ StringType
の単一列が含まれています。その結果、この DataFrame が共通の SaveMode.Overwrite
モードでSnowflakeにエクスポートされると、新しい表がSnowflakeにタイプ VARCHAR
の単一列で作成されます。
jsonDataFrame
を VARIANT
列にロードするには、
Snowflakeテーブルを作成します(Snowflake JDBC ドライバーを使用してJavaでSnowflakeに接続)。例で使用されている接続パラメーターの説明については、 JDBC ドライバーの接続パラメーター参照 をご参照ください。
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; public class SnowflakeJDBCExample { public static void main(String[] args) throws Exception { String jdbcUrl = "jdbc:snowflake://myorganization-myaccount.snowflakecomputing.com/"; Properties properties = new Properties(); properties.put("user", "peter"); properties.put("password", "test"); properties.put("account", "myorganization-myaccount"); properties.put("warehouse", "mywh"); properties.put("db", "mydb"); properties.put("schema", "public"); // get connection System.out.println("Create JDBC connection"); Connection connection = DriverManager.getConnection(jdbcUrl, properties); System.out.println("Done creating JDBC connection\n"); // create statement System.out.println("Create JDBC statement"); Statement statement = connection.createStatement(); System.out.println("Done creating JDBC statement\n"); // create a table System.out.println("Create my_variant_table table"); statement.executeUpdate("create or replace table my_variant_table(json VARIANT)"); statement.close(); System.out.println("Done creating demo table\n"); connection.close(); System.out.println("Close connection\n"); } }
既存のテーブルを再利用する場合は、
SaveMode.Overwrite
の代わりに、SaveMode.Append
を使用します。 JSON を表す文字列値がSnowflakeにロードされると、ターゲット列が VARIANT型であるため、 JSONとして解析されます。例:df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "my_variant_table") .mode(SaveMode.Append) .save()
DDL/DML SQL ステートメントの実行¶
クエリに加えて、 runQuery()
メソッド( Utils
オブジェクトの)を使用して、 DDL/DML SQL ステートメントを実行します。例:
var sfOptions = Map(
"sfURL" -> "<account_identifier>.snowflakecomputing.com",
"sfUser" -> "<user_name>",
"sfPassword" -> "<password>",
"sfDatabase" -> "<database>",
"sfSchema" -> "<schema>",
"sfWarehouse" -> "<warehouse>"
)
Utils.runQuery(sfOptions, "CREATE TABLE MY_TABLE(A INTEGER)")
sfOptions
は DataFramesの読み書きに使用されるパラメーターマップです。
runQuery
メソッドは TRUE または FALSE のみを返します。結果セットを返さないステートメント、例えば DDL ステートメント(CREATE TABLE
のような)、および DML ステートメント( INSERT
、 UPDATE
、 DELETE
のような)を対象としています。 SELECT
や SHOW
などの結果セットを返すステートメントでは有効ではありません。
タイムスタンプとタイムゾーンの操作¶
Sparkでは、Scala/Java Timestamp型と同等のタイムスタンプ1種類のみを提供します。Snowflakeの TIMESTAMP_LTZ (現地時間帯)データ型の動作とほぼ同様です。そのため、SparkとSnowflakeの間でデータを転送するときは、タイムゾーンに準じた時間を正しく保つために、次の方法を使用することをお勧めします。
Snowflakeでは、 TIMESTAMP_LTZ データ型のみを使用します。
注釈
デフォルトのタイム・スタンプ・データ・タイプ・マッピングは TIMESTAMP_NTZ (タイムゾーンなし)であるため、 必ず TIMESTAMP_TYPE_MAPPING パラメーターにより TIMESTAMP_LTZ の使用を明示的に設定します。
Sparkのタイムゾーンを
UTC
に設定し、Snowflakeでこのタイムゾーンを使用します(例:コネクタにsfTimezone
オプションを設定しないでください。また、Snowflakeでタイムゾーンを明示的に設定しないでください)。このシナリオでは、 TIMESTAMP_LTZ と TIMESTAMP_NTZ は事実上同等です。タイムゾーンを設定するには、Sparkコードに次の行を追加します:
java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
これらのアプローチのいずれも実装しない場合、望ましくない時間変更が発生する可能性があります。たとえば、次のシナリオを考えます:
Sparkのタイムゾーンは
America/New_York
に設定されています。Snowflakeのタイムゾーンは
Europe/Warsaw
に設定されています。これは次のいずれかの方法で発生します。コネクタの
sfTimezone
をEurope/Warsaw
に設定しています。コネクタの
sfTimezone
をsnowflake
に設定し、Snowflakeの TIMEZONE セッションパラメータをEurope/Warsaw
に設定します。
Snowflakeでは TIMESTAMP_NTZ と TIMESTAMP_LTZ の両方が使用されています。
このシナリオでは、
Snowflakeの TIMESTAMP_NTZ 列で
12:00:00
を表す値がSparkに送信された場合、この値にはタイムゾーン情報は含まれません。Sparkはこの値、ニューヨークの12:00:00
として扱います。Sparkがこの値
12:00:00
(ニューヨーク)をSnowflakeに送り返して TIMESTAMP_LTZ 列にロードすると、自動的に変換され、18:00:00
(ワルシャワタイムゾーン用)としてロードされます。この値がSnowflakeで TIMESTAMP_NTZ に変換されると、ユーザーには
18:00:00
が表示されますが、これは元の値12:00:00
とは異なります。
要約すると、これらのルールの少なくとも1つを 少なくとも 厳守することをお勧めします。
SparkとSnowflakeの両方に、 同じ タイムゾーン、理想的には
UTC
を使用します。SparkとSnowflakeの間でデータを転送するには、 TIMESTAMP_LTZ データ型 のみ を使用します。
サンプルScalaプログラム¶
重要
このサンプルプログラムは、バージョン2.2.0(またはそれ以上)のコネクタを使用していることを前提としています。Snowflake内部ステージを使用して一時データを保存するため、一時データの保存にS3の場所は必要ありません。以前のバージョンを使用している場合は、既存のS3の場所があり、 tempdir
、 awsAccessKey
、 awsSecretKey
の値( sfOptions
の)を含める必要があります。詳細については、 外部データ転送の AWS オプション (このトピック)をご参照ください。
次のScalaプログラムは、Spark用Snowflakeコネクタの完全な使用例を提供します。コードを使用する前に、 コネクタの構成オプションの設定 (このトピック内)で説明されているように、次の文字列を適切な値に置き換えます。
<アカウント識別子>
: 使用する アカウント識別子。<ユーザー名>
、<パスワード>
: Snowflakeユーザーのログイン認証情報。<データベース>
、<スキーマ>
、<ウェアハウス>
:Snowflakeセッションのデフォルト。
サンプルのScalaプログラムは、基本認証(つまり、ユーザー名とパスワード)を使用します。 OAuth で認証する場合は、 外部 OAuth の使用(このトピック内)をご参照ください。
import org.apache.spark.sql._
//
// Configure your Snowflake environment
//
var sfOptions = Map(
"sfURL" -> "<account_identifier>.snowflakecomputing.com",
"sfUser" -> "<user_name>",
"sfPassword" -> "<password>",
"sfDatabase" -> "<database>",
"sfSchema" -> "<schema>",
"sfWarehouse" -> "<warehouse>"
)
//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t1")
.load()
//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("query", "select c1, count(*) from t1 group by c1")
.load()
//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "t2")
.mode(SaveMode.Overwrite)
.save()
Pythonでのコネクタの使用¶
Pythonでコネクタを使用することは、Scalaの使用法に非常に似ています。
Sparkディストリビューションに含まれている bin/pyspark
スクリプトを使用することをお勧めします。
pyspark
スクリプトの構成¶
pyspark
スクリプトを spark-shell
スクリプトと同様に構成する必要があります( --packages
または --jars
オプションを使用)。例:
bin/pyspark --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3
Snowflake Sparkコネクタおよび JDBC コネクタ.jarファイルを CLASSPATH 環境変数に含めることを忘れないでください。
spark-shell
スクリプトの構成の詳細については、 ステップ4:ローカルSparkクラスターまたはAmazon EMR がホストするSpark環境を構成する をご参照ください。
セッションでのプッシュダウンの有効化/無効化¶
コネクタのバージョン2.1.0(以降)はクエリプッシュダウンをサポートします。これは、SnowflakeがSparkデータソースである場合にクエリ処理をSnowflakeにプッシュすることにより、パフォーマンスを大幅に改善できます。
デフォルトでは、プッシュダウンは有効になっています。
特定の DataFrame のSparkセッション内でプッシュダウンを無効にするには、
SparkSession
オブジェクトをインスタンス化した後、SnowflakeConnectorUtils.disablePushdownSession
静的メソッドを呼び出してSparkSession
オブジェクトを渡します。例:sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
自動プッシュダウン オプションを
off
に設定して DataFrame を作成します。例:df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", query) \ .option("autopushdown", "off") \ .load()
options
メソッドに渡すDictionary
に、autopushdown
オプションを設定することもできます(例: 上記の例のsfOptions
)。
プッシュダウンを無効にした後で再度有効にするには、 SnowflakeConnectorUtils.enablePushdownSession
静的メソッドを呼び出し(SparkSession
オブジェクトを渡す)、 autopushdown
を有効にして DataFrame を作成します。
サンプルPythonスクリプト¶
重要
このサンプルスクリプトは、バージョン2.2.0(またはそれ以上)のコネクタを使用していることを前提としています。Snowflake内部ステージを使用して一時データを保存するため、このデータの保存にS3の場所は必要ありません。以前のバージョンを使用している場合は、既存のS3の場所があり、 tempdir
、 awsAccessKey
、 awsSecretKey
の値( sfOptions
の)を含める必要があります。詳細については、 外部データ転送の AWS オプション (このトピック)をご参照ください。
pyspark
スクリプトを構成したら、 SQL クエリおよびその他の操作を実行できます。以下は、単純な SQL クエリを実行するPythonスクリプトの例です。このスクリプトは、基本的なコネクタの使用法を示しています。このドキュメントのScalaの例のほとんどは、Pythonで使用するための最小限の労力/変更で適応できます。
サンプルPythonスクリプトは、基本認証(つまり、ユーザー名とパスワード)を使用します。 OAuth で認証する場合は、 外部 OAuth の使用(このトピック内)をご参照ください。
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>')
# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>")
# Set options below
sfOptions = {
"sfURL" : "<account_identifier>.snowflakecomputing.com",
"sfUser" : "<user_name>",
"sfPassword" : "<password>",
"sfDatabase" : "<database>",
"sfSchema" : "<schema>",
"sfWarehouse" : "<warehouse>"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "select 1 as my_num union all select 2 as my_num") \
.load()
df.show()
Tip
sfOptions
と SNOWFLAKE_SOURCE_NAME
の使用に注意してください。これにより、コードが簡素化され、エラーが発生する可能性が低くなります。
sfOptions
でサポートされているオプションの詳細については、 コネクタの構成オプションの設定 (このトピック)をご参照ください。
データ型マッピング¶
Sparkコネクタは、多くの一般的なデータ型間の変換をサポートしています。
Spark SQL からSnowflakeへ¶
Spark データ タイプ
Snowflake データ タイプ
ArrayType
VARIANT
BinaryType
サポート対象外
BooleanType
BOOLEAN
ByteType
INTEGER.Snowflakeは BYTE タイプをサポートしていません。
DateType
DATE
DecimalType
DECIMAL
DoubleType
DOUBLE
FloatType
FLOAT
IntegerType
INTEGER
LongType
INTEGER
MapType
VARIANT
ShortType
INTEGER
StringType
長さが指定されている場合 VARCHAR(N)、そうでない場合 VARCHAR
StructType
VARIANT
TimestampType
TIMESTAMP
SnowflakeからSpark SQLまで¶
Snowflake データ タイプ
Spark データ タイプ
ARRAY
StringType
BIGINT
DecimalType(38, 0)
BINARY
サポート対象外
BLOB
サポート対象外
BOOLEAN
BooleanType
CHAR
StringType
CLOB
StringType
DATE
DateType
DECIMAL
DecimalType
DOUBLE
DoubleType
FLOAT
DoubleType
INTEGER
DecimalType(38, 0)
OBJECT
StringType
TIMESTAMP
TimestampType
TIME
StringType
(Sparkコネクタバージョン2.4.14以降)VARIANT
StringType
DataFrame.showメソッドの呼び出し¶
DataFrame.show
メソッドを呼び出し、 DataFrame の行数よりも少ない数を渡す場合は、並べ替えられた順序で表示する行のみを含む DataFrame を作成します。
これを実行するには、
最初に
sort
メソッドを呼び出して、並べ替えられた行を含む DataFrame を返します。その DataFrame で
limit
メソッドを呼び出し、表示する行だけを含む DataFrame を返します。返された DataFrame で
show
メソッドを呼び出します。
たとえば、5行を表示し、結果を列 my_col
で並べ替える場合は、
val dfWithRowsToShow = originalDf.sort("my_col").limit(5) dfWithRowsToShow.show(5)
それ以外では、 show
を呼び出して DataFrame の行のサブセットを表示すると、コードの実行が異なる場合に、異なる行が表示される可能性があります。
コネクタの構成オプションの設定¶
次のセクションでは、コネクタの動作を構成するために設定するオプションのリストを示します。
これらのオプションを設定するには、 Spark DataframeReader クラスの .option(< キー >, < 値 >)
、または .options(< マップ >)
メソッドを呼び出します。
Tip
オプションの使用を容易にするために、単一の Map
オブジェクトでオプションを指定し、 .options(< マップ >)
を呼び出してオプションを設定することをお勧めします。
必要な接続オプション¶
Snowflakeに接続するには、次のオプションが必要です。
sfUrl
アカウントの ホスト名 を次の形式で指定します。
account_identifier.snowflakecomputing.com
account_identifier
は、 使用する アカウント識別子 です。sfUser
Snowflakeユーザーのログイン名。
認証には、次のいずれかのオプションも使用する必要があります。
sfPassword
Snowflakeユーザーのパスワード。
pem_private_key
キーペア認証用のシークレットキー( PEM 形式)。手順については、 キーペア認証とキーペアローテーション をご参照ください。
sfAuthenticator
Snowflakeへの認証に 外部 OAuth を使用することを指定します。値を
oauth
に設定します。外部 OAuth を使用するには、
sfToken
パラメーターを設定する必要があります。
sfToken
外部 OAuth を使用している場合には、値を外部 OAuth アクセストークンに設定する必要があります。
この接続パラメーターでは、
sfAuthenticator
パラメーター値をoauth
に設定する必要があります。デフォルトはありません。
必要なコンテキストオプション¶
セッションのデータベースとスキーマコンテキストを設定するには、次のオプションが必要です:
sfDatabase
接続後にセッションに使用するデータベース。
sfSchema
接続後にセッションに使用するスキーマ。
その他のコンテキストオプション¶
このセクションにリストされているオプションは必須ではありません。
sfAccount
アカウント識別子(例:
myorganization-myaccount
)。アカウント識別子がsfUrl
で指定されているため、このオプションは不要になりました。ここでは、下位互換性のためにのみ文書化されています。sfWarehouse
接続後にセッションに使用するデフォルトの仮想ウェアハウス。
sfRole
接続後にセッションに使用するデフォルトのセキュリティロール。
プロキシオプション¶
このセクションにリストされているオプションは必須ではありません。
use_proxy
コネクタが、プロキシを使用するかどうかを指定します。
true
はコネクタがプロキシを使用する必要があることを指定します。false
はコネクタがプロキシを使用する必要がないことを指定します。
デフォルト値は
false
です。proxy_host
(
use_proxy
がtrue
の場合に必須)使用するプロキシサーバーのホスト名を指定します。proxy_port
(
use_proxy
がtrue
の場合に必須)使用するプロキシサーバーのポート番号を指定します。proxy_protocol
プロキシサーバーへの接続に使用するプロトコルを指定します。次の値のいずれかを指定できます。
http
https
デフォルト値は
http
です。これは、AWS のSnowflakeでのみサポートされています。
このオプションは、Sparkコネクタのバージョン2.11.1で追加されました。
proxy_user
プロキシサーバーへの認証用のユーザー名を指定します。プロキシサーバーが認証を必要とする場合はこの設定を行います。
これは、AWS のSnowflakeでのみサポートされています。
proxy_password
プロキシサーバーへの認証用の
proxy_user
のパスワードを指定します。プロキシサーバーが認証を必要とする場合はこの設定を行います。これは、AWS のSnowflakeでのみサポートされています。
non_proxy_hosts
プロキシサーバーをバイパスして、コネクタが直接接続する必要があるホストのリストを指定します。
ホスト名は URL をエスケープしたパイプ記号(
%7C
)で区切ります。アスタリスク(*
)をワイルドカードとして使用することもできます。これは、AWS のSnowflakeでのみサポートされています。
追加オプション¶
このセクションにリストされているオプションは必須ではありません。
sfTimezone
Sparkでの作業時にSnowflakeが使用するタイムゾーン。パラメーターはSnowflakeのタイムゾーンのみを設定することに注意してください。Spark環境は変更されません。サポートされている値は次のとおりです。
spark
:Sparkのタイムゾーンを使用します(デフォルト)。snowflake
:Snowflakeの現在のタイムゾーンを使用します。sf_default
:接続しているSnowflakeユーザーのデフォルトのタイムゾーンを使用します。time_zone
: 有効な場合、特定のタイムゾーン(例:America/New_York
)を使用します。このオプションの設定の影響の詳細については、 タイムスタンプとタイムゾーンの操作 (このトピック内)をご参照ください。
sfCompress
on
(デフォルト)に設定すると、SnowflakeとSparkの間で渡されるデータが圧縮されます。s3MaxFileSize
SnowflakeからSparkにデータを移動するときに使用されるファイルのサイズ。デフォルトは 10MB です。
preactions
SparkとSnowflakeの間でデータが転送される前に実行される SQL コマンドのセミコロン区切りリスト。
SQL コマンドに
%s
が含まれる場合、%s
は操作で参照されるテーブル名に置き換えられます。postactions
SparkとSnowflakeの間でデータが転送される後に実行される SQL コマンドのセミコロン区切りリスト。
SQL コマンドに
%s
が含まれる場合、これは操作で参照されるテーブル名に置き換えられます。truncate_columns
on
(デフォルト)に設定すると、 COPY コマンドはターゲット列の長さを超えるテキスト文字列を自動的に切り捨てます。off
に設定した場合、ロードされた文字列がターゲット列の長さを超えると、コマンドはエラーを生成します。truncate_table
このパラメーターは、そのテーブルを上書きするときに、SnowflakeがSnowflakeターゲットテーブルのスキーマを保持するかどうかを制御します。
デフォルトでは、Snowflakeのターゲットテーブルが上書きされると、そのターゲットテーブルのスキーマも上書きされます。新しいスキーマは、ソーステーブル(Sparkデータフレーム)のスキーマに基づいています。
ただし、ソースのスキーマが理想的でない場合があります。例えば、ユーザーは、最初のソース列のデータ型が INTEGERであっても、Snowflakeターゲットテーブルが将来 FLOAT 値を格納できるようにしたい場合があります。その場合は、Snowflakeテーブルのスキーマを上書きしないでください。Snowflakeテーブルは単に切り捨ててから、現在のスキーマで再利用する必要があります。
このパラメーターの可能な値は次のとおりです:
on
off
このパラメーターが
on
の場合、ターゲットテーブルの元のスキーマが保持されます。このパラメーターがoff
の場合、テーブルの古いスキーマは無視され、ソースのスキーマに基づいて新しいスキーマが生成されます。このパラメーターはオプションです。
このパラメーターのデフォルト値は
off
です(つまり、デフォルトでは元のテーブルスキーマが上書きされます)。Sparkデータ型からSnowflakeデータ型へのマッピング(およびその逆)の詳細については、 データ型マッピング (このトピック)をご参照ください。
continue_on_error
この変数は、ユーザーが無効なデータ(例えば、バリアントデータ型列の無効な JSON 形式)を入力した場合に COPY コマンドを中止するかどうかを制御します。
可能な値は次のとおりです:
on
off
値
on
は、エラーが発生しても続行することを意味します。値off
は、エラーが発生した場合に中止することを意味します。このパラメーターはオプションです。
このパラメーターのデフォルト値は
off
です。このオプションをオンにすることはお勧めしません。Sparkコネクタを使用して COPYing からSnowflakeにエラーが報告された場合、データが失われる可能性があります。
注釈
行が拒否されたまたは欠落している場合、入力ソースでそれらの行に明らかに欠陥がない場合は、Snowflakeに報告してください。
usestagingtable
このパラメーターは、データのロードでステージングテーブルを使用するかどうかを制御します。
ステージングテーブルは、コネクタによって作成される通常のテーブル(一時的な名前を持つ)です。データのロード操作が成功すると、元のターゲットテーブルが削除され、ステージングテーブルの名前が元のターゲットテーブルの名前に変更されます。データのロード操作が失敗した場合、ステージングテーブルは削除され、ターゲットテーブルには操作の直前に保持していたデータが残ります。したがって、ステージングテーブルを使用すると、操作が失敗した場合に元のターゲットテーブルデータを保持できます。安全のため、Snowflakeはほとんどの状況でステージングテーブルを使用することを強くお勧めします。
コネクタがステージングテーブルを作成するには、Sparkコネクタを介して COPY を実行するユーザーに、テーブルを作成するための十分な権限が必要です。ユーザーがテーブルを作成する権限を持っていない場合、直接ロード(つまり、ステージングテーブルを使用しない読み込み)は便利です。
このパラメーターの可能な値は次のとおりです:
on
off
パラメーターが
on
の場合、ステージングテーブルが使用されます。このパラメーターがoff
の場合、データはターゲットテーブルに直接ロードされます。このパラメーターはオプションです。
このパラメーターのデフォルト値は
on
です(つまり、ステージングテーブルを使用します)。
autopushdown
このパラメーターは、自動クエリプッシュダウンを有効にするかどうかを制御します。
プッシュダウンが有効になっている場合、クエリがSparkで実行されるときに、クエリの一部をSnowflakeサーバーに「プッシュダウン」できる場合、プッシュダウンされます。これにより、一部のクエリのパフォーマンスが向上します。
このパラメーターはオプションです。
コネクタがSparkの互換バージョンに接続されている場合、デフォルト値は
on
です。それ以外の場合、デフォルト値はoff
です。コネクタが意図されているものとは異なるバージョンのSparkに接続されている場合(例: コネクタのバージョン3.2がSparkのバージョン3.3に接続されている場合)、このパラメーターが
on
に設定されていても自動プッシュダウンは無効になります。purge
これが
on
に設定されている場合、コネクタは外部データ転送を介してSparkからSnowflakeに転送するときに作成された一時ファイルを削除します。このパラメーターがoff
に設定されている場合、これらのファイルはコネクターによって自動的に削除されません。パージは、SparkからSnowflakeへの転送に対してのみ機能し、SnowflakeからSparkへの転送には機能しません。
可能な値は次のとおりです
on
off
デフォルト値は
off
です。columnmap
このパラメーターは、SparkからSnowflakeにデータを書き込み、Snowflakeテーブルの列名がSparkテーブルの列名と一致しない場合に役立ちます。各Snowflake宛先列に対応するSparkソース列を示すマップを作成できます。
パラメータは、次の形式の単一の文字列リテラルです。
"Map(col_2 -> col_b, col_3 -> col_a)"
たとえば、次のシナリオを考えます:
Sparkの
df
という名前のデータフレームには3つの列があります:col_1
、col_2
、col_3
Snowflakeの
tb
という名前のテーブルには2つの列があります:col_a
、col_b
次の値をコピーします:
df.col_2
からtb.col_b
に。df.col_3
からtb.col_a
に。
columnmap
パラメーターの値は次のとおりです。Map(col_2 -> col_b, col_3 -> col_a)
次のScalaコードを実行して、この値を生成できます。
Map("col_2"->"col_b","col_3"->"col_a").toString()
このパラメーターのデフォルト値はnullです。つまり、デフォルトでは、ソース表と宛先表の列名は一致する必要があります。
このパラメーターは、SparkからSnowflakeに書き込むときにのみ使用されます。SnowflakeからSparkに書き込む場合は適用されません。
keep_column_case
SparkからSnowflakeにテーブルを書き込む場合、Sparkコネクタはデフォルトで、列名が二重引用符で囲まれていない限り、列名の文字を大文字にシフトします。
SnowflakeからSparkにテーブルを書き込む場合、Sparkコネクタはデフォルトで、大文字、アンダースコア、数字を除くすべての文字を含む列名を二重引用符で囲みます。
keep_column_caseを
on
に設定すると、Sparkコネクタはこれらの変更を行いません。可能な値は次のとおりです:
on
off
デフォルト値は
off
です。column_mapping
コネクタは、Sparkデータフレームの列をSnowflakeテーブルにマップする必要があります。これは、列名に基づいて(順序に関係なく)、または列の順序に基づいて実行できます(つまり、データフレームの最初の列は、列名に関係なく、テーブルの最初の列にマップされます)。
デフォルトでは、マッピングは順序に基づいて行われます。このパラメーターを
name
に設定することでオーバーライドできます。これは、コネクタに列名に基づいて列をマップするよう指示します。(名前のマッピングでは大文字と小文字は区別されません。)このパラメーターの可能な値は次のとおりです:
order
name
デフォルト値は
order
です。column_mismatch_behavior
このパラメーターは、
column_mapping
パラメーターがname
に設定されている場合にのみ適用されます。Sparkデータフレームの列名とSnowflakeテーブルが一致しない場合、
column_mismatch_behavior
がerror
の場合、Sparkコネクタはエラーを報告します。column_mismatch_behavior
がignore
の場合、Sparkコネクタはエラーを無視します。ドライバーは、Snowflakeテーブルに対応する列がないSparkデータフレームの列をすべて破棄します。
ドライバーは、Sparkデータフレームに対応する列がないSnowflakeテーブルの列に NULLs を挿入します。
潜在的なエラーは次のとおりです:
Sparkデータフレームには、大文字と小文字を除いて同一の列を含めることができます。列名のマッピングでは大文字と小文字が区別されないため、データフレームからテーブルへの正しいマッピングを決定することはできません。
Snowflakeテーブルには、大文字と小文字を除いて同一の列を含めることができます。列名のマッピングでは大文字と小文字が区別されないため、データフレームからテーブルへの正しいマッピングを決定することはできません。
SparkデータフレームとSnowflakeテーブルには、共通の列名がない場合があります。理論的には、Sparkコネクタはすべての行のすべての列に NULLs を挿入できますが、これは通常無意味であるため、
column_mismatch_behavior
がignore
に設定されていてもコネクタはエラーをスローします。
このパラメーターの可能な値は次のとおりです:
error
ignore
デフォルト値は
error
です。time_output_format
このパラメーターを使用すると、ユーザーは、返される
TIME
データの形式を指定できます。このパラメーターの可能な値は、 時刻形式 で指定された時刻形式の可能な値です。
このパラメーターは出力のみに影響し、入力には影響しません。
timestamp_ntz_output_format
、 .timestamp_ltz_output_format
、 .timestamp_tz_output_format
これらのオプションは、タイムスタンプ値の出力形式を指定します。これらのオプションのデフォルト値は次のとおりです。
構成オプション
デフォルト値
timestamp_ntz_output_format
"YYYY-MM-DD HH24:MI:SS.FF3"
timestamp_ltz_output_format
"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"
timestamp_tz_output_format
"TZHTZM YYYY-MM-DD HH24:MI:SS.FF3"
これらのオプションが
"sf_current"
に設定されている場合、コネクタはセッションに指定された形式を使用します。partition_size_in_mb
このパラメーターは、クエリ結果セットが非常に大きく、複数の DataFrame パーティションに分割する必要がある場合に使用されます。このパラメーターは、各 DataFrame パーティションの推奨される非圧縮サイズを指定します。パーティションの数を減らすには、このサイズを大きくします。
このサイズは推奨サイズとして使用されます。パーティションの実際のサイズはこれより小さくても大きくてもかまいません。
このオプションは、 use_copy_unload パラメーターが FALSE の場合にのみ適用されます。
このパラメーターはオプションです。
デフォルト値は
100
(MB)です。
use_copy_unload
これが
FALSE
の場合、Snowflakeは SELECTing データのときにArrowデータ形式を使用します。これがTRUE
に設定されている場合、Snowflakeは、選択したデータを送信するためにCOPY UNLOAD
コマンドを使用する古い動作に戻ります。このパラメーターはオプションです。
デフォルト値は
FALSE
です。treat_decimal_as_long
TRUE
の場合、型Decimal(precision, 0)
を返すクエリに対して(BigDecimal
値ではなく)Long
値を返すようにSparkコネクタを構成します。デフォルト値は
FALSE
です。このオプションは、Sparkコネクタのバージョン2.11.1で追加されました。
s3_stage_vpce_dns_name
内部ステージにアクセスするための VPC エンドポイントの DNS 名を指定します。
このオプションは、Sparkコネクタのバージョン2.11.1で追加されました。
support_share_connection
FALSE
の場合は、Sparkコネクタを構成し、Snowflakeにアクセスするために同じSparkコネクタオプションを使用する各ジョブまたはアクションに対して新しい JDBC 接続を作成します。デフォルト値は
TRUE
です。これは、異なるジョブやアクションが同じSparkコネクタオプションを使用してSnowflakeにアクセスする場合に、同じ JDBC 接続を共有することを意味します。プログラムを使用してこの設定を有効または無効にする必要がある場合は、次のstaticグローバル関数を使用します。
SparkConnectorContext.disableSharedConnection()
SparkConnectorContext.enableSharingJDBCConnection()
注釈
次の特別なケースでは、Sparkコネクタは共有の JDBC 接続を使用しません。
準備処理または事後処理が設定され、それらの準備処理または事後処理が CREATE TABLE、 DROP TABLE、または MERGE INTO ではない場合、Sparkコネクタは共有接続を使用しません。
Utils.runQuery()
やUtils.getJDBCConnection()
などのUtilsのユーティリティ関数は、共有接続を使用しません。
このオプションは、Sparkコネクタのバージョン2.11.2で追加されました。
force_skip_pre_post_action_check_for_shared_session
TRUE
の場合は、セッション共有のための準備処理と事後処理の検証を無効にするようにSparkコネクタを構成します。デフォルト値は
FALSE
です。重要
このオプションを設定する前に、準備処理と事後処理のクエリがセッションの設定に影響を与えないことを確認してください。そうしないと、結果に問題が発生する可能性があります。
このオプションは、Sparkコネクタのバージョン2.11.3で追加されました。
キーペア認証およびキーペアローテーションの使用¶
Sparkコネクタは、キーペア認証とキーローテーションをサポートしています。
開始するには、 キーペア認証とキーペアローテーション に示すように、キーペア認証の初期構成を完了します。
pem_private_key
接続オプションを使用して、秘密キーの 非暗号化 コピーを送信します。
注意
セキュリティ上の理由から、アプリケーションで pem_private_key
をハードコーディングするのではなく、安全なソースからキーを読み取った後、パラメーターを動的に設定する必要があります。キーが暗号化されている場合は、キーを復号化し、復号化したバージョンを送信します。
Pythonの例では、 pem_private_key
ファイル、 rsa_key.p8
は次のとおりです。
環境変数
PRIVATE_KEY_PASSPHRASE
を使用して、パスワードで保護されたファイルから直接読み取られます。sfOptions
文字列で式pkb
を使用します。
接続するには、Pythonの例をファイル(つまり、 <file.py>
)に保存してから、次のコマンドを実行します。
spark-submit --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 <file.py>
Python
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os
with open("<path>/rsa_key.p8", "rb") as key_file:
p_key = serialization.load_pem_private_key(
key_file.read(),
password = os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend = default_backend()
)
pkb = p_key.private_bytes(
encoding = serialization.Encoding.PEM,
format = serialization.PrivateFormat.PKCS8,
encryption_algorithm = serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n","",pkb).replace("\n","")
sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('Simple App')
sfOptions = {
"sfURL" : "<account_identifier>.snowflakecomputing.com",
"sfUser" : "<user_name>",
"pem_private_key" : pkb,
"sfDatabase" : "<database>",
"sfSchema" : "schema",
"sfWarehouse" : "<warehouse>"
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "COLORS") \
.load()
df.show()
外部 OAuth の使用¶
Sparkコネクタバージョン2.7.0以降からサンプルScalaプログラムまたはサンプルPythonスクリプトを使用したSnowflakeへの認証に、 外部 OAuth を使用することができます。
外部 OAuth とSparkコネクタを使用してSnowflakeを認証する前に、サポートされている外部 OAuth 認証サーバーの1つ、または外部 OAuth カスタムクライアント の外部 OAuth セキュリティ統合を構成します。
ScalaおよびPythonの例では、 sfPassword
パラメーターが、 sfAuthenticator
および sfToken
パラメーターに置き換えられていることに注意してください。
Scala:
// spark connector version val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" import net.snowflake.spark.snowflake2.Utils.SNOWFLAKE_SOURCE_NAME import org.apache.spark.sql.DataFrame var sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<username>", "sfAuthenticator" -> "oauth", "sfToken" -> "<external_oauth_access_token>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) // // Create a DataFrame from a Snowflake table // val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "region") .load() // // Join, augment, aggregate, etc. the data in Spark and then use the // Data Source API to write the data back to a table in Snowflake // df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t2") .mode(SaveMode.Overwrite) .save()
Python:
from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * sc = SparkContext("local", "Simple App") spark = SQLContext(sc) spark_conf = SparkConf().setMaster('local').setAppName('<APP_NAME>') # You might need to set these sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<AWS_KEY>") sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<AWS_SECRET>") # Set options below sfOptions = { "sfURL" : "<account_identifier>.snowflakecomputing.com", "sfUser" : "<user_name>", "sfAuthenticator" : "oauth", "sfToken" : "<external_oauth_access_token>", "sfDatabase" : "<database>", "sfSchema" : "<schema>", "sfWarehouse" : "<warehouse>" } SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", "select 1 as my_num union all select 2 as my_num") \ .load() df.show()
外部データ転送の AWS オプション¶
これらのオプションは、一時データが保存されるAmazon S3ロケーションを指定し、そのロケーションにアクセスするための認証の詳細を提供するために使用されます。これらは外部データ転送を行う場合 のみ 必要です。次のいずれかに該当する場合、外部データ転送が必要です:
バージョン2.1.x以下のSparkコネクタ(内部転送をサポートしない)を使用している、または
転送には36時間以上かかる可能性がある場合(内部転送では、36時間後に期限が切れる一時的な認証情報が使用されます)。
tempDir
中間データが保存されるS3の場所(例:
s3n://xy12345-bucket/spark-snowflake-tmp/
)。tempDir
が指定されている場合、次のいずれかも指定する必要があります。awsAccessKey
、awsSecretKey
. またはtemporary_aws_access_key_id
、temporary_aws_secret_access_key
、temporary_aws_session_token
awsAccessKey
、awsSecretKey
これらは、
tempDir
で指定された場所へのアクセスを許可する標準の AWS 認証情報です。これらのオプションは両方とも一緒に設定する必要があることに注意してください。設定されている場合、既存の
SparkContext
オブジェクトから取得できます。これらの変数を指定する場合、
tempDir
も指定する必要があります。これらの認証情報は、Hadoopクラスターにも設定する必要があります。
temporary_aws_access_key_id
、temporary_aws_secret_access_key
、temporary_aws_session_token
これらは、
tempDir
で指定された場所へのアクセスを許可する一時的な AWS 認証情報です。これらのオプションは3つとも一緒に設定する必要があることに注意してください。また、これらのオプションが設定されている場合、
awsAccessKey
およびawsSecretKey
オプションよりも優先されます。temporary_aws_access_key_id
、temporary_aws_secret_access_key
、およびtemporary_aws_session_token
を指定する場合、tempDir
も指定する必要があります。それ以外の場合、これらのパラメーターは無視されます。check_bucket_configuration
on
(デフォルト)に設定されている場合、コネクタはデータ転送に使用されるバケットにライフサイクルポリシーが構成されているかどうかを確認します(詳細については、 AWS 外部S3バケットの準備 を参照)。ライフサイクルポリシーが存在しない場合、警告がログに記録されます。(
off
に設定して)このオプションを無効にすると、このチェックはスキップされます。これは、ユーザーがバケットライフサイクルポリシーではなくバケットデータ操作にアクセスできる場合に役立ちます。このオプションを無効にすると、クエリの実行時間をわずかに短縮できます。
詳細については、 データ交換のためのS3の認証 (このトピック)をご参照ください。
外部データ転送のAzureオプション¶
このセクションでは、外部データ転送を行うときにAzure Blobストレージに適用されるパラメーターについて説明します。次のいずれかに該当する場合、外部データ転送が必要です。
バージョン2.1.x以下のSparkコネクタ(内部転送をサポートしない)を使用している、または
転送には36時間以上かかる可能性がある場合(内部転送では、36時間後に期限が切れる一時的な認証情報が使用されます)。
Azure Blobストレージで外部転送を使用する場合、以下で説明するパラメーターを使用して、Azureコンテナの場所とそのコンテナの SAS (共有アクセス署名)を指定します。
tempDir
中間データが保存されるAzure Blobストレージコンテナ。これは URLの形式で、たとえば次のとおりです。
wasb://<azureコンテナ>@<azureアカウント>.<azureエンドポイント>/
temporary_azure_sas_token
Azure Blobストレージの SAS トークンを指定します。
詳細については、このトピックの データ交換のためのAzureの認証 をご参照ください。
Sparkの一時ストレージのAzure情報を指定する¶
Azure Blobストレージを使用して一時ストレージを提供し、SparkとSnowflakeの間でデータを転送する場合、SparkとSnowflake Sparkコネクタに一時ストレージの場所と認証情報を提供する必要があります。
Sparkに一時的な保存場所を提供するには、Sparkクラスターで次のようなコマンドを実行します。
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
最後のコマンドには次の変数が含まれていることに注意してください。
<コンテナ>
および<アカウント>
:Azureデプロイメントのコンテナとアカウント名です。<Azureエンドポイント>
:Azureデプロイメントの場所のエンドポイントです。たとえば、Azure US デプロイメントを使用している場合、エンドポイントはblob.core.windows.net
である可能性があります。<Azure SAS>
:共有アクセス署名セキュリティトークンです。
これらの各変数をAzure BLOBストレージアカウントの適切な情報に置き換えます。
Snowflakeセッションパラメータをコネクタのオプションとして渡す¶
Spark用Snowflakeコネクタは、Snowflakeへの任意のセッションレベルのパラメータの送信をサポートしています(詳細については、 セッションパラメーター をご参照ください)。これは、 options
オブジェクトに ("<キー>" -> "<値>")
ペアを追加することで実現できます。 <キー>
はセッションパラメーター名で、 <値>
は値です。
注釈
<値>
は、数値またはブール値(例: "1"
または "true"
)を受け入れるパラメーターであっても、二重引用符で囲まれた文字列でなければなりません。
たとえば、次のコードサンプルは、 "false"
の値を持つ USE_CACHED_RESULT セッションパラメーターを渡します。これにより、以前に実行されたクエリの結果の使用が無効になります。
// ... assuming sfOptions contains Snowflake connector options
// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")
// ... now use sfOptions with the .options() method
セキュリティに関する考慮事項¶
お客様は、マルチノードのSparkシステムでは、ノード間の通信が安全であることを確認する必要があります。SparkマスターはSnowflake認証情報をSparkワーカーに送信し、それらのワーカーがSnowflakeステージにアクセスできるようにします。SparkマスターとSparkワーカー間の通信が安全でない場合、認証情報は不正な第三者によって読み取られる可能性があります。
データ交換のためのS3の認証¶
このセクションでは、データ交換にS3を使用する場合の認証方法について説明します。
このタスクは、次のいずれかの状況で のみ 必要です:
Snowflake Connector for Sparkバージョンは2.1.x(またはそれ以下)です。v2.2.0以降、コネクタはデータ交換にSnowflake内部の一時ステージを使用します。現在コネクタのバージョン2.2.0(またはそれ以上)を使用していない場合、Snowflakeは最新バージョンにアップグレードすることを強くお勧めします。
Snowflake Connector for Sparkバージョンは2.2.0(またはそれ以上)ですが、ジョブの長さは定期的に36時間を超えています。これは、データ交換のために内部ステージにアクセスするためにコネクタが使用する AWS トークンの最大期間です。
コネクタの古いバージョンを使用している場合、SnowflakeとSparkの間でデータを交換するためにコネクタが使用できるS3の場所を準備する必要があります。
SparkとSnowflakeの間でデータを交換するために使用されるS3バケット/ディレクトリ( tempDir
で指定されている)へのアクセスを許可するために、2つの認証方法がサポートされています:
永続的な AWS 認証情報(S3にアクセスするためのHadoop/Spark認証の構成にも使用)
一時的な AWS 認証情報
永続的な AWS 認証情報の使用¶
これは標準の AWS 認証方法です。 awsAccessKey
と awsSecretKey
の値のペアが必要です。
注釈
これらの値は、S3にアクセスするためのHadoop/Sparkの構成にも使用する必要があります。例などの詳細については、 S3A または S3Nを使用したHadoop/Sparkの認証 (このトピック)をご参照ください。
例:
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>") // Then, configure your Snowflake environment // var sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<user_name>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>", "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"), "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"), "tempdir" -> "s3n://<temp-bucket-name>" )
sfOptions
でサポートされているオプションの詳細については、 外部データ転送の AWS オプション (このトピック)をご参照ください。
S3A または S3Nを使用したHadoop/Sparkの認証¶
Hadoop/Sparkエコシステムは、S3 にアクセスするための2つの URI スキーム を サポートしています:
s3a://
新しい推奨方法(Hadoop 2.7以降の場合)
この方法を使用するには、このトピックのScalaの例を変更して、次のHadoop構成オプションを追加します。
val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3a.access.key", <accessKey>) hadoopConf.set("fs.s3a.secret.key", <secretKey>)
tempdir
オプションもs3a://
を使用していることを確認してください。s3n://
以前の方法(Hadoop 2.6以前の場合)
一部のシステムでは、次のScalaの例に示すように明示的に指定する必要があります:
val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>) hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)
一時的な AWS 認証情報の使用¶
このメソッドは、コネクタの temporary_aws_access_key_id
、 temporary_aws_secret_access_key
、および temporary_aws_session_token
構成オプションを使用します。
この方法では、データ交換に使用されるS3バケット/ディレクトリへの一時的なアクセスのみをSnowflakeに提供することにより、セキュリティを強化できます。
注釈
一時的な認証情報は、コネクタのS3認証を構成するためにのみ使用できます。Hadoop/Spark認証の構成には使用できません。
また、仮の認証情報を提供する場合、提供された永続的な認証情報よりも優先されます。
次のScalaコードサンプルは、一時的な認証情報を使用した認証の例を示しています。
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest import net.snowflake.spark.snowflake.Parameters // ... val sts_client = new AWSSecurityTokenServiceClient() val session_token_request = new GetSessionTokenRequest() // Set the token duration to 2 hours. session_token_request.setDurationSeconds(7200) val session_token_result = sts_client.getSessionToken(session_token_request) val session_creds = session_token_result.getCredentials() // Create a new set of Snowflake connector options, based on the existing // sfOptions definition, with additional temporary credential options that override // the credential options in sfOptions. // Note that constants from Parameters are used to guarantee correct // key names, but literal values, such as temporary_aws_access_key_id are, of course, // also allowed. var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId()) sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey()) sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())
sfOptions2
が options()
DataFrame メソッドで使用できるようになりました。
データ交換のためのAzureの認証¶
このセクションでは、データ交換にAzure Blobストレージを使用する場合の認証方法について説明します。
この方法の認証は、次のいずれかの 状況でのみ 必要です。
Snowflake Connector for Sparkバージョンは2.1.x(またはそれ以下)です。v2.2.0以降、コネクタはデータ交換にSnowflake内部の一時ステージを使用します。現在コネクタのバージョン2.2.0(またはそれ以上)を使用していない場合、Snowflakeは最新バージョンにアップグレードすることを強くお勧めします。
Snowflake Connector for Sparkバージョンは2.2.0(またはそれ以上)ですが、ジョブの長さは定期的に36時間を超えています。これは、データ交換のために内部ステージにアクセスするためにコネクタが使用するAzureトークンの最大期間です。
コネクタがSnowflakeとSparkの間でデータを交換するために使用できるAzure Blobストレージコンテナを準備する必要があります。
Azure認証情報を使用する¶
これは、標準のAzure Blobストレージ認証方法です。値のペアが必要です: tempDir
( URL)と temporary_azure_sas_token
の値。
注釈
これらの値は、Azure BlobストレージにアクセスするためのHadoop/Sparkの構成にも使用する必要があります。例などの詳細については、 Azureを使用したHadoop/Sparkの認証 (このトピック)をご参照ください。
例:
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>) // Then, configure your Snowflake environment // val sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<user_name>", "sfPassword" -> "<password>", "sfDatabase" -> "<database_name>", "sfSchema" -> "<schema_name>", "sfWarehouse" -> "<warehouse_name>", "sfCompress" -> "on", "sfSSL" -> "on", "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/", "temporary_azure_sas_token" -> "<azure_sas>" )
sfOptions
でサポートされているオプションの詳細については、 外部データ転送のAzureオプション (このトピック)をご参照ください。
Azureを使用したHadoop/Sparkの認証¶
この方法を使用するには、このトピックのScalaの例を変更して、次のHadoop構成オプションを追加します。
val hadoopConf = sc.hadoopConfiguration sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb") sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<azure_endpoint>", <azure_sas>)
tempdir
オプションも wasb://
を使用していることを確認してください。
ブラウザーを介した認証はサポートされていません¶
Sparkコネクタを使用する場合、ブラウザウィンドウを開いてユーザーに認証情報を要求する認証形式を使用することは実用的ではありません。ウィンドウは、必ずしもクライアントマシンに表示されるとは限りません。したがって、Sparkコネクタは、ブラウザウィンドウを呼び出す MFA (多要素認証)や SSO (シングルサインオン)などの認証の種類をサポートしていません。