Apache Spark™から Apache Iceberg™ テーブルをクエリするときにデータ保護ポリシーを適用する

このトピックでは、Snowflake Horizonカタログを介してApache Spark™を介してアクセスする場合に、Apache Iceberg™テーブルに設定されたデータ保護ポリシーを適用する方法について説明します。データ保護ポリシーを適用するには、Spark用Snowflakeコネクタ、Sparkコネクタ をインストールします。Sparkコネクタの詳細については、 Snowflake Connector for Spark をご参照ください。

Sparkコネクタは、Snowflakeを介してクエリをルーティングすることにより、Snowflakeポリシーで保護されているテーブルのクエリをサポートします。これにより、コンピューティングの効率的な使用と一貫した実施が保証されます。Sparkコネクタは、Snowflakeを介して書き込みをルーティングすることにより、Snowflakeポリシーで保護されているテーブルに対する書き込み操作の実行もサポートします。

注釈

Sparkコネクタは、Snowflake Horizonカタログを介したSparkセッションコンピューティングを使用することで、きめ細かいデータ保護ポリシーなしでApache Icebergテーブルを直接クエリすることもサポートします。

SparkからIcebergテーブルをクエリする際にデータ保護ポリシーを適用するワークフロー

SparkからIcebergテーブルをクエリする際にデータ保護ポリシーを適用するには、以下の手順を実行します。

  1. データ保護ポリシーを構成する

  2. Snowflake Sparkコネクタを使用してSparkをIcebergテーブルに接続する には、Spark用Snowflakeコネクタのダウンロードと、Snowflake Horizonカタログを通じてSparkをIcebergテーブルに接続することが含まれます。

  3. Icebergテーブルのクエリ

サポートされているデータ保護ポリシー

次のデータ保護ポリシーがサポートされています。

他のデータポリシーで保護されているテーブルに対するクエリはエラーになります。

前提条件

  • 次の情報を取得します。

    • テーブルをクエリするSnowflakeユーザーのユーザー名

    • クエリするテーブルを含むSnowflakeデータベースの名前

    • ポリシー評価に使用するSnowflakeの仮想ウェアハウスの名前

  • クエリするIcebergテーブルを含むSnowflakeアカウントのアカウント識別子を取得します。手順については、 アカウント識別子 をご参照ください。データアクセスポリシーが適用されたIcebergテーブルにSparkを接続する 場合に、この識別子を指定します。

    Tip

    SQL を使用してアカウント識別子を取得するには次のコマンドを実行できます。

    SELECT CURRENT_ORGANIZATION_NAME() || '-' || CURRENT_ACCOUNT_NAME();
    
    Copy

ステップ1:データ保護ポリシーを構成する

重要

クエリするIcebergテーブルにデータ保護ポリシーがすでに設定されている場合は、次のステップに進みます。

このステップでは、データ保護ポリシーを構成します。

ステップ2:Spark用Snowflakeコネクタを使用してSparkをIcebergテーブルに接続する

このステップでは、Horizonカタログを介してSparkをIcebergテーブルに接続します。この接続で、テーブルに適用されたデータ保護ポリシーでSparkを使用してテーブルをクエリできます。

Spark用Snowflakeコネクタ(Sparkコネクタ)を使用してSparkをIcebergテーブルに接続するには、まずSparkコネクタをダウンロードしてから、SparkをIcebergテーブルに接続します。

Python用Snowflakeコネクタをダウンロード

Spark用Snowflakeコネクタの3.1.6以降のバージョンをダウンロードするには、 Sparkコネクターのインストールと構成 の手順に従います。

SparkをIcebergテーブルに接続する

このステップでは、Horizonカタログを介してSparkをIcebergテーブルに接続します。この接続には、HorizonカタログでSpark用Snowflakeコネクタを使用して、Snowflakeデータ保護ポリシーで保護されているテーブルをクエリするための構成が含まれます。

注釈

外部 OAuth またはキーペア認証を使用した接続を使用する場合、 外部 OAuth またはキーペア認証を使用してSparkをIcebergテーブルに接続する をご参照ください。

  • プログラムのアクセストークン(PAT)を使用してSparkをIcebergテーブルに接続する場合、次の PySpark コード例を使用します:

    from pyspark.sql import SparkSession
    
    # Snowflake Horizon Catalog Configuration, change as per your environment
    
    CATALOG_URI = "https://<account_identifier>.snowflakecomputing.com/polaris/api/catalog"
    ROLE = "<role>"
    HORIZON_SESSION_ROLE = f"session:role:{ROLE}"
    CATALOG_NAME = "<database_name>" #provide in UPPER CASE
    SF_URL= "<account_identifier>.snowflakecomputing.com"
    SF_USER = "<user_name>" #provide in UPPER CASE
    SF_PASSWORD = "<user_password>"
    SF_SCHEMA = "<schema_name>" #provide in UPPER CASE
    SF_WAREHOUSE = "<warehouse_name>" #provide in UPPER CASE
    
    
    # Cloud Service Provider Region Configuration (where the Iceberg data is stored)
    REGION = "<region_name>"
    
    # Paste the External Oauth Access token that you generated in Snowflake here
    ACCESS_TOKEN = "<your_access_token>"
    
    # Paste the PAT you generated in Snowflake here
    PAT_TOKEN = "<your_PAT_token>"
    
    # Iceberg Version
    ICEBERG_VERSION = "1.9.1"
    
    #Snowflake Connector for Spark
    DRIVER_VERSION = "3.24.0" # (or above)
    SNOWFLAKE_CONNECTOR_VERSION = "3.1.6"
    
    
    try:
        spark.stop()
    except:
        pass
    
      spark = (
          SparkSession.builder
    
          .master("local[*]")
    .config("spark.ui.port", "0")
          .config("spark.driver.bindAddress", "127.0.0.1")
          .config("spark.driver.host", "127.0.0.1")
          .config("spark.driver.port", "0")
          .config("spark.blockManager.port", "0")
    
    
    # JAR Dependencies for Iceberg, Azure and Snowflake Connector for Spark
          .config(
     "spark.jars.packages",
     f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{ICEBERG_VERSION},"
     f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION},"
    
       # for Azure storage, use the below package and comment above azure bundle
              # f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}"
    # for Snowflake Connector for Spark
     f"net.snowflake:snowflake-jdbc:{DRIVER_VERSION},"
     f"net.snowflake:spark-snowflake_2.12:{SNOWFLAKE_CONNECTOR_VERSION}"
    
    )
          # Iceberg SQL Extensions
          .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
          .config("spark.sql.defaultCatalog", "horizoncatalog")
    .config("spark.sql.catalog.horizoncatalog", "org.apache.spark.sql.snowflake.catalog.SnowflakeFallbackCatalog")
    
      #Horizon REST Catalog Configuration
       .config(f"spark.sql.catalog.horizoncatalog.catalog-impl", "org.apache.iceberg.spark.SparkCatalog")
          .config(f"spark.sql.catalog.horizoncatalog.type", "rest")
          .config(f"spark.sql.catalog.horizoncatalog.uri", CATALOG_URI)
          .config(f"spark.sql.catalog.horizoncatalog.warehouse", CATALOG_NAME)
          .config(f"spark.sql.catalog.horizoncatalog.scope", HORIZON_SESSION_ROLE)
          .config(f"spark.sql.catalog.horizoncatalog.client.region", REGION)
          .config(f"spark.sql.catalog.horizoncatalog.credential", PAT_TOKEN)
    # for External Oauth use below and comment above configuration .token
    #.config(f"spark.sql.catalog.horizoncatalog.token", ACCESS_TOKEN)
    
    .config("spark.sql.catalog.horizoncatalog.io-impl","org.apache.iceberg.aws.s3.S3FileIO")
    # Enforcing policies using Snowflake Connector for Spark
    .config("spark.snowflake.sfURL", SF_URL)
    .config("spark.snowflake.sfUser", SF_USER)
    .config("spark.snowflake.sfPassword", SF_PASSWORD)
    # for External Oauth uncomment below and comment above configurations for user and password
    #.config("spark.snowflake.sfAuthenticator","oauth")
    #.config("spark.snowflake.sfToken",ACCESS_TOKEN)
    .config("spark.snowflake.sfDatabase", CATALOG_NAME)
    .config("spark.snowflake.sfSchema",SF_SCHEMA) # Optional
    .config("spark.snowflake.sfRole",ROLE)
    .config("spark.snowflake.sfWarehouse",SF_WAREHOUSE)
    
      # Required for vended credentials
     .config(f"spark.sql.catalog.horizoncatalog.header.X-Iceberg-Access-Delegation", "vended-credentials")
          .config("spark.sql.iceberg.vectorization.enabled", "false")
          .getOrCreate()
      )
      spark.sparkContext.setLogLevel("ERROR")
    
    Copy

    条件:

    • <account_identifier> は、クエリするIcebergテーブルを含むSnowflakeアカウントのSnowflakeアカウント識別子です。この識別子を見つけるには、 アカウント識別子 をご参照ください。

    • <your_access_token> は、取得したアクセストークンです。アクセストークンを取得するには、 認証用にアクセストークンを取得する をご参照ください。

      注釈

      外部 OAuth の場合は、アクセストークンを指定する代わりに、自動トークン更新を使用してエンジンへの接続を構成することもできます。

    • <database_name> は、クエリするSnowflake管理のIcebergテーブルを含むSnowflakeアカウントのデータベースの名前です。

      注釈

      Sparkの次のプロパティは、Snowflakeウェアハウス名ではなく、Snowflake データベース 名を想定しています。

      • .warehouse

      • .sfDatabase

    • <role> は、クエリするIcebergテーブルへのアクセスで構成されたSnowflakeのロールです。例: DATA_ENGINEER。

    • <user_name> は、Snowflakeのテーブルへのアクセスに使用されるユーザー名です。

    • <user_password> は、テーブルにアクセスするユーザーのパスワードです。

      注釈

      このパスワードは、該当する場合は、認証用に取得したプログラムアクセストークン(PAT)を費用できます。

    • <schema_name> は、テーブルが格納されているSnowflakeのスキーマです。これはオプションです。

    • <warehouse_name> は、ポリシーの評価に使用するSnowflakeウェアハウス(コンピュートインスタンス)名です。

    重要

    デフォルトでは、コード例はAmazon S3に保存された Apache Iceberg™ テーブル用に設定されています。IcebergテーブルがAzure Storage(ADLS)に保存されている場合は、次のステップを実行します:

    1. 次の行をコメントアウトします: f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}"

    2. 次の行のコメントを解除します: # f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}"

外部 OAuth またはキーペア認証を使用してSparkをIcebergテーブルに接続する

前のコード例は、プログラムによるアクセストークン(PAT)を使用して接続するための構成を示しています。

外部 OAuth またはキーペア認証を使用してSparkをIcebergテーブルに接続するには、以下の手順に従って前のコード例を変更します。

  1. <your_access_token> には、外部 OAuth またはキーペア認証のアクセストークンを指定します。

    アクセストークンを取得するには、 ステップ3:認証用アクセストークンの取得 をご参照ください。

  2. 次の行をコメントアウトします: .config(f"spark.sql.catalog.{CATALOG_NAME}.credential", PAT_TOKEN)

  3. 次の行のコメントを解除します: #.config(f"spark.sql.catalog.{CATALOG_NAME}.token", ACCESS_TOKEN)

ステップ3:Sparkを使用したIcebergテーブルのクエリ

Snowflakeデータ保護ポリシーで保護されているIcebergテーブルを読み取るには、Sparkを使用します。Sparkは、Snowflakeポリシーで保護されているテーブルのクエリをSnowflake経由で自動的にルーティングし、一貫した実施を確保できます。

テーブルをクエリする

spark.sql("SHOW NAMESPACES").show(truncate=False)
spark.sql("USE horizoncatalog.<schema_name>")
spark.sql("SHOW TABLES").show(truncate=False)
spark.sql("Select * from <your_table_name_in_snowflake>").show(truncate=False)
Copy

ポリシー評価のためにクエリをモニターする

ポリシー評価のためにSparkからSnowflakeにルーティングされるクエリのSnowflakeでクエリアクティビティをモニターするには、Snowflakeアカウントでクエリアクティビティをモニターできます。

データ保護ポリシーを構成する際の考慮事項

データ保護ポリシーを構成するときは、次の項目を考慮してください。

  • Sparkを使用してクエリするIcebergテーブルへのデータ保護ポリシーの適用は、テーブルに次のデータ保護ポリシーが設定されている場合にのみサポートされます。

    • マスキングポリシー

    • タグベースのマスキングポリシー

    • 行アクセスポリシー

    他のすべてのポリシーによって保護されているテーブルに対するクエリは、エラーになります。