Apache Spark™から Apache Iceberg™ テーブルをクエリするときにデータ保護ポリシーを適用する¶
このトピックでは、Snowflake Horizonカタログを介してApache Spark™を介してアクセスする場合に、Apache Iceberg™テーブルに設定されたデータ保護ポリシーを適用する方法について説明します。データ保護ポリシーを適用するには、Spark用Snowflakeコネクタ、Sparkコネクタ をインストールします。Sparkコネクタの詳細については、 Snowflake Connector for Spark をご参照ください。
Sparkコネクタは、Snowflakeを介してクエリをルーティングすることにより、Snowflakeポリシーで保護されているテーブルのクエリをサポートします。これにより、コンピューティングの効率的な使用と一貫した実施が保証されます。Sparkコネクタは、Snowflakeを介して書き込みをルーティングすることにより、Snowflakeポリシーで保護されているテーブルに対する書き込み操作の実行もサポートします。
注釈
Sparkコネクタは、Snowflake Horizonカタログを介したSparkセッションコンピューティングを使用することで、きめ細かいデータ保護ポリシーなしでApache Icebergテーブルを直接クエリすることもサポートします。
SparkからIcebergテーブルをクエリする際にデータ保護ポリシーを適用するワークフロー¶
SparkからIcebergテーブルをクエリする際にデータ保護ポリシーを適用するには、以下の手順を実行します。
Snowflake Sparkコネクタを使用してSparkをIcebergテーブルに接続する には、Spark用Snowflakeコネクタのダウンロードと、Snowflake Horizonカタログを通じてSparkをIcebergテーブルに接続することが含まれます。
サポートされているデータ保護ポリシー¶
次のデータ保護ポリシーがサポートされています。
他のデータポリシーで保護されているテーブルに対するクエリはエラーになります。
前提条件¶
次の情報を取得します。
テーブルをクエリするSnowflakeユーザーのユーザー名
クエリするテーブルを含むSnowflakeデータベースの名前
ポリシー評価に使用するSnowflakeの仮想ウェアハウスの名前
クエリするIcebergテーブルを含むSnowflakeアカウントのアカウント識別子を取得します。手順については、 アカウント識別子 をご参照ください。データアクセスポリシーが適用されたIcebergテーブルにSparkを接続する 場合に、この識別子を指定します。
Tip
SQL を使用してアカウント識別子を取得するには次のコマンドを実行できます。
SELECT CURRENT_ORGANIZATION_NAME() || '-' || CURRENT_ACCOUNT_NAME();
ステップ1:データ保護ポリシーを構成する¶
重要
クエリするIcebergテーブルにデータ保護ポリシーがすでに設定されている場合は、次のステップに進みます。
このステップでは、データ保護ポリシーを構成します。
データ保護ポリシーを構成するには、クエリを実行するIcebergテーブルにデータアクセスポリシーを設定します。
マスキングポリシーを割り当てるには、 ダイナミックデータマスキングについて をご参照ください。
タグベースのマスキングポリシーを割り当てるには、 タグベースのマスキングポリシー をご参照ください。
行アクセスポリシーを割り当てるには、 行アクセスポリシーについて をご参照ください。
ステップ2:Spark用Snowflakeコネクタを使用してSparkをIcebergテーブルに接続する¶
このステップでは、Horizonカタログを介してSparkをIcebergテーブルに接続します。この接続で、テーブルに適用されたデータ保護ポリシーでSparkを使用してテーブルをクエリできます。
Spark用Snowflakeコネクタ(Sparkコネクタ)を使用してSparkをIcebergテーブルに接続するには、まずSparkコネクタをダウンロードしてから、SparkをIcebergテーブルに接続します。
Python用Snowflakeコネクタをダウンロード¶
Spark用Snowflakeコネクタの3.1.6以降のバージョンをダウンロードするには、 Sparkコネクターのインストールと構成 の手順に従います。
SparkをIcebergテーブルに接続する¶
このステップでは、Horizonカタログを介してSparkをIcebergテーブルに接続します。この接続には、HorizonカタログでSpark用Snowflakeコネクタを使用して、Snowflakeデータ保護ポリシーで保護されているテーブルをクエリするための構成が含まれます。
注釈
外部 OAuth またはキーペア認証を使用した接続を使用する場合、 外部 OAuth またはキーペア認証を使用してSparkをIcebergテーブルに接続する をご参照ください。
プログラムのアクセストークン(PAT)を使用してSparkをIcebergテーブルに接続する場合、次の PySpark コード例を使用します:
from pyspark.sql import SparkSession # Snowflake Horizon Catalog Configuration, change as per your environment CATALOG_URI = "https://<account_identifier>.snowflakecomputing.com/polaris/api/catalog" ROLE = "<role>" HORIZON_SESSION_ROLE = f"session:role:{ROLE}" CATALOG_NAME = "<database_name>" #provide in UPPER CASE SF_URL= "<account_identifier>.snowflakecomputing.com" SF_USER = "<user_name>" #provide in UPPER CASE SF_PASSWORD = "<user_password>" SF_SCHEMA = "<schema_name>" #provide in UPPER CASE SF_WAREHOUSE = "<warehouse_name>" #provide in UPPER CASE # Cloud Service Provider Region Configuration (where the Iceberg data is stored) REGION = "<region_name>" # Paste the External Oauth Access token that you generated in Snowflake here ACCESS_TOKEN = "<your_access_token>" # Paste the PAT you generated in Snowflake here PAT_TOKEN = "<your_PAT_token>" # Iceberg Version ICEBERG_VERSION = "1.9.1" #Snowflake Connector for Spark DRIVER_VERSION = "3.24.0" # (or above) SNOWFLAKE_CONNECTOR_VERSION = "3.1.6" try: spark.stop() except: pass spark = ( SparkSession.builder .master("local[*]") .config("spark.ui.port", "0") .config("spark.driver.bindAddress", "127.0.0.1") .config("spark.driver.host", "127.0.0.1") .config("spark.driver.port", "0") .config("spark.blockManager.port", "0") # JAR Dependencies for Iceberg, Azure and Snowflake Connector for Spark .config( "spark.jars.packages", f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{ICEBERG_VERSION}," f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}," # for Azure storage, use the below package and comment above azure bundle # f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}" # for Snowflake Connector for Spark f"net.snowflake:snowflake-jdbc:{DRIVER_VERSION}," f"net.snowflake:spark-snowflake_2.12:{SNOWFLAKE_CONNECTOR_VERSION}" ) # Iceberg SQL Extensions .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.defaultCatalog", "horizoncatalog") .config("spark.sql.catalog.horizoncatalog", "org.apache.spark.sql.snowflake.catalog.SnowflakeFallbackCatalog") #Horizon REST Catalog Configuration .config(f"spark.sql.catalog.horizoncatalog.catalog-impl", "org.apache.iceberg.spark.SparkCatalog") .config(f"spark.sql.catalog.horizoncatalog.type", "rest") .config(f"spark.sql.catalog.horizoncatalog.uri", CATALOG_URI) .config(f"spark.sql.catalog.horizoncatalog.warehouse", CATALOG_NAME) .config(f"spark.sql.catalog.horizoncatalog.scope", HORIZON_SESSION_ROLE) .config(f"spark.sql.catalog.horizoncatalog.client.region", REGION) .config(f"spark.sql.catalog.horizoncatalog.credential", PAT_TOKEN) # for External Oauth use below and comment above configuration .token #.config(f"spark.sql.catalog.horizoncatalog.token", ACCESS_TOKEN) .config("spark.sql.catalog.horizoncatalog.io-impl","org.apache.iceberg.aws.s3.S3FileIO") # Enforcing policies using Snowflake Connector for Spark .config("spark.snowflake.sfURL", SF_URL) .config("spark.snowflake.sfUser", SF_USER) .config("spark.snowflake.sfPassword", SF_PASSWORD) # for External Oauth uncomment below and comment above configurations for user and password #.config("spark.snowflake.sfAuthenticator","oauth") #.config("spark.snowflake.sfToken",ACCESS_TOKEN) .config("spark.snowflake.sfDatabase", CATALOG_NAME) .config("spark.snowflake.sfSchema",SF_SCHEMA) # Optional .config("spark.snowflake.sfRole",ROLE) .config("spark.snowflake.sfWarehouse",SF_WAREHOUSE) # Required for vended credentials .config(f"spark.sql.catalog.horizoncatalog.header.X-Iceberg-Access-Delegation", "vended-credentials") .config("spark.sql.iceberg.vectorization.enabled", "false") .getOrCreate() ) spark.sparkContext.setLogLevel("ERROR")
条件:
<account_identifier>は、クエリするIcebergテーブルを含むSnowflakeアカウントのSnowflakeアカウント識別子です。この識別子を見つけるには、 アカウント識別子 をご参照ください。<your_access_token>は、取得したアクセストークンです。アクセストークンを取得するには、 認証用にアクセストークンを取得する をご参照ください。注釈
外部 OAuth の場合は、アクセストークンを指定する代わりに、自動トークン更新を使用してエンジンへの接続を構成することもできます。
<database_name>は、クエリするSnowflake管理のIcebergテーブルを含むSnowflakeアカウントのデータベースの名前です。注釈
Sparkの次のプロパティは、Snowflakeウェアハウス名ではなく、Snowflake データベース 名を想定しています。
.warehouse.sfDatabase
<role>は、クエリするIcebergテーブルへのアクセスで構成されたSnowflakeのロールです。例: DATA_ENGINEER。<user_name>は、Snowflakeのテーブルへのアクセスに使用されるユーザー名です。<user_password>は、テーブルにアクセスするユーザーのパスワードです。注釈
このパスワードは、該当する場合は、認証用に取得したプログラムアクセストークン(PAT)を費用できます。
<schema_name>は、テーブルが格納されているSnowflakeのスキーマです。これはオプションです。<warehouse_name>は、ポリシーの評価に使用するSnowflakeウェアハウス(コンピュートインスタンス)名です。
重要
デフォルトでは、コード例はAmazon S3に保存された Apache Iceberg™ テーブル用に設定されています。IcebergテーブルがAzure Storage(ADLS)に保存されている場合は、次のステップを実行します:
次の行をコメントアウトします:
f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}"次の行のコメントを解除します:
# f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}"
外部 OAuth またはキーペア認証を使用してSparkをIcebergテーブルに接続する¶
前のコード例は、プログラムによるアクセストークン(PAT)を使用して接続するための構成を示しています。
外部 OAuth またはキーペア認証を使用してSparkをIcebergテーブルに接続するには、以下の手順に従って前のコード例を変更します。
<your_access_token>には、外部 OAuth またはキーペア認証のアクセストークンを指定します。アクセストークンを取得するには、 ステップ3:認証用アクセストークンの取得 をご参照ください。
次の行をコメントアウトします:
.config(f"spark.sql.catalog.{CATALOG_NAME}.credential", PAT_TOKEN)次の行のコメントを解除します:
#.config(f"spark.sql.catalog.{CATALOG_NAME}.token", ACCESS_TOKEN)
ステップ3:Sparkを使用したIcebergテーブルのクエリ¶
Snowflakeデータ保護ポリシーで保護されているIcebergテーブルを読み取るには、Sparkを使用します。Sparkは、Snowflakeポリシーで保護されているテーブルのクエリをSnowflake経由で自動的にルーティングし、一貫した実施を確保できます。
テーブルをクエリする¶
spark.sql("SHOW NAMESPACES").show(truncate=False)
spark.sql("USE horizoncatalog.<schema_name>")
spark.sql("SHOW TABLES").show(truncate=False)
spark.sql("Select * from <your_table_name_in_snowflake>").show(truncate=False)
ポリシー評価のためにクエリをモニターする¶
ポリシー評価のためにSparkからSnowflakeにルーティングされるクエリのSnowflakeでクエリアクティビティをモニターするには、Snowflakeアカウントでクエリアクティビティをモニターできます。
Snowflakeでクエリ履歴をモニターするには、 クエリ履歴でクエリのアクティビティをモニターする の指示に従ってください。
データ保護ポリシーを構成する際の考慮事項¶
データ保護ポリシーを構成するときは、次の項目を考慮してください。
Sparkを使用してクエリするIcebergテーブルへのデータ保護ポリシーの適用は、テーブルに次のデータ保護ポリシーが設定されている場合にのみサポートされます。
マスキングポリシー
タグベースのマスキングポリシー
行アクセスポリシー
他のすべてのポリシーによって保護されているテーブルに対するクエリは、エラーになります。