クラウドサービスのファイルデータへの Snowpark Connect for Spark によるアクセス¶
Snowpark Connect for Spark を使えば、Amazon S3、Google Cloud Storage、Azure Blobなどの外部クラウドストレージシステムと直接やり取りすることができます。クラウドストレージからSnowflakeにデータを読み込み、データを処理し、書き戻すことができます。
たとえば、Snowpark Connect for Spark を使用して次のタスクを実行できます。
生データを取り込む。
Snowflakeにファイル(たとえば、 CSV 、 JSON 、Parquet)を移動する前に、S3、Google Cloud、またはAzureに置きます。
ダウンストリーム用にデータをエクスポートする。
ML のトレーニング、外部パートナーとの共有、またはSparkベースの分析を行うために、処理済みのSnowpark DataFrames をクラウドストレージに書き戻します。
ハイブリッドパイプラインを作成する。
既存のデータレイクとの互換性を維持しながら、パイプラインの一部をSnowflakeで維持します。
規制に準拠する、またはコストを削減する。
規制、ガバナンス、予算の制約により、特定のデータセットを外部に保存します。
このトピックに記載されている手順を使用して、これらのクラウドサービスプロバイダーに保存されているファイルとの間で読み取り、書き込みを行います。Snowflakeの外部ステージまたは直接アクセスのいずれかを使用してファイルにアクセスできます。
注意事項¶
Snowpark Connect for Spark を使用してクラウドサービスと連携する際には、以下の注意点に留意してください。
認証---Snowpark Connect for Spark はクラウドの認証情報を自動的に管理しません。自分でアクセスキーを設定するか(AWS)、アカウントキーまたは SAS トークンを保存するか(Azure)、外部ステージを維持する必要があります。認証情報の期限が切れている、または認証情報がない場合、読み取り/書き込みに失敗します。
パフォーマンス---クラウドI/Oは、ネットワーク帯域幅とオブジェクトストアのレイテンシーに依存します。小さなファイルをたくさん読み込むと、パフォーマンスに大きな影響を与える可能性があります。
形式のサポート---読み書きするファイル形式が確実にサポートされているようにします。現在、Snowpark Connect for Spark は、TEXT、CSV、JSON、Parquet などの一般的な形式と同等性があります。ただし、高度な機能(Parquetのパーティション検出やJSON のスキーマ進化など)はSparkとは異なる可能性があります。
権限とポリシー---クラウドバケットへの書き込みには、適切な IAM/ACL ポリシーが必要です。Snowflakeのロールとクラウドの認証情報の間でポリシーが一致していない場合、AccessDenied エラーが発生することがあります。
ベストプラクティス¶
優れたパフォーマンスを発揮する最も信頼性の高い統合を実現するには、以下のベストプラクティスに従います。
セキュアな仮認証情報を使用し、認証情報を頻繁にローテーションします。
データのパーティショニングとバケット化を行います。
Parquetを記述する際は、頻繁にフィルタリングされる列についてパーティショニングし、スキャンコストを削減します。小さなファイルをたくさん使うのではなく、少数の大きなファイル(たとえば、それぞれ 100MB から 500MB)を使います。
書き込み時にスキーマを検証します。
特に JSON やCSV のような半構造化フォーマットの場合は、常にスキーマを明示的に定義します。これにより、Snowflakeと外部データ間のドリフトを防ぐことができます。
コストをモニターします。
コストを削減するために、書き込みの前にファイルを統合し、データをフィルタリングすることを検討します。クラウドプロバイダーのコストは、リクエストごと、スキャンしたバイトごとに発生します。
API 呼び出しを標準化します。
関数やパラメーターを使用する際は、文書化されたガイダンスに正確に従います。こうすることで、互換性を維持し、リグレッションを防ぎ、異なるクラウドプロバイダーをまたいで期待される動作を保証することができます。
Snowflakeの外部ステージを使用したアクセス¶
Amazon S3への安全なアクセスを構成 し、S3ロケーションを指す外部ステージを作成します。
外部ステージから読み取ります。
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
Azureへの安全なアクセスを構成 し、Azureコンテナを指す外部ステージを作成します。
外部ステージから読み取ります。
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
Google Cloudへの安全なアクセスを構成 し、Google Cloud Storageバケットを指す外部ステージを作成します。
外部ステージから読み取ります。
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
直接アクセスによるアクセス¶
ここで説明する手順とコードを使うと、クラウドサービスプロバイダー上のファイルに直接アクセスできます。
AWS の認証情報を使ってSparkの設定を行います。
# For S3 related access with public/private buckets, please add these config change spark.conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled","false") spark.conf.set("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") spark.conf.set("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.2") # For private S3 access, please also provide credentials spark.conf.set("spark.hadoop.fs.s3a.access.key","<AWS_ACCESS_KEY_ID>") spark.conf.set("spark.hadoop.fs.s3a.secret.key","<AWS_SECRET_ACCESS_KEY>") spark.conf.set("spark.hadoop.fs.s3a.session.token","<AWS_SESSION_TOKEN>")
S3で直接読み書きします。
# Read CSV spark.read.csv('s3a://<bucket name>/<file path>') spark.read.option("header", True).csv('s3a://<bucket name>/<file path>') # read with header in file # Write to CSV df.write.csv('s3a://<bucket name>/<file path>') df.write.option("header", True).csv('s3a://<bucket name>/<file path>') # write with header in file # Read Text spark.read.text('s3a://<bucket name>/<file path>') # Write to Text df.write.text('s3a://<bucket name>/<file path>') df.write.format("text").mode("overwrite").save('s3a://<bucket name>/<file path>') # Read Parquet spark.read.parquet('s3a://<bucket name>/<file path>') # Write to Parquet df.write.parquet('s3a://<bucket name>/<file path>') # Read JSON spark.read.json('s3a://<bucket name>/<file path>') # Write to JSON df.write.json('s3a://<bucket name>/<file path>')
Azureの認証情報を使ってSparkの設定を行います。
# For private Azure access, please also provide blob SAS token # * Make sure all required permissions are in place before proceeding spark.conf.set("fs.azure.sas.fixed.token.<storage-account>.dfs.core.windows.net","<Shared Access Token>")
Azureで直接読み書きします。
# Read CSV spark.read.csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') spark.read.option("header", True).csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # read with header in file # Write to CSV df.write.csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') df.write.option("header", True).csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # write with header in file # Read Text spark.read.text('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to Text df.write.text('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') df.write.format("text").mode("overwrite").save('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Read Parquet spark.read.parquet('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to Parquet df.write.parquet('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Read JSON spark.read.json('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to JSON df.write.json('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>')