Renforcer les politiques de protection des données lors de l’interrogation de tables Apache Iceberg™ Apache Spark™

Cette rubrique décrit comment appliquer des politiques de protection des données définies sur les tables Apache Iceberg™ lors de l’accès via Apache Spark™ via Snowflake Horizon Catalog. Pour appliquer des politiques de protection des données, vous installez Snowflake Connector pour Spark, le connecteur Spark. Pour plus d’informations sur le connecteur Spark, consultez Connecteur Snowflake pour Spark.

Le connecteur Spark prend en charge l’interrogation de tables protégées par des politiques Snowflake en acheminant la requête via Snowflake, ce qui garantit une utilisation efficace du calcul et une application cohérente. Le connecteur Spark prend également en charge l’exécution d’opérations d’écriture sur des tables protégées par des politiques Snowflake en acheminant l’écriture via Snowflake.

Note

Le connecteur Spark prend également en charge l’interrogation directe des tables Apache Iceberg sans politiques de protection des données détaillées via le calcul de session Spark via le catalogue Horizon Snowflake.

Flux de travail pour appliquer des politiques de protection des données lors de l’interrogation de tables Iceberg à partir de Spark

Pour appliquer des politiques de protection des données lors de l’interrogation de tables Iceberg à partir de Spark, procédez comme suit :

  1. Configurez des politiques de protection des données.

  2. Connectez Spark avec le connecteur Snowflake Spark à des tables Iceberg, qui inclut le téléchargement de Snowflake Connector pour Spark et la connexion de Spark aux tables Iceberg via le catalogue Snowflake Horizon.

  3. Interrogez des tables Iceberg.

Politiques de protection des données prises en charge

Les politiques de protection des données suivantes sont prises en charge :

Les requêtes sur des tables qui sont protégées par une autre politique de données entraînent une erreur.

Conditions préalables

  • Récupérez les informations suivantes :

    • Le nom d’utilisateur de l’utilisateur Snowflake qui interrogera les tables

    • Le nom de la base de données Snowflake qui contient les tables que vous souhaitez interroger.

    • Le nom de l’entrepôt virtuel dans Snowflake à utiliser pour l’évaluation des politiques

  • Récupérez l’identificateur de compte de votre compte Snowflake contenant les tables Iceberg que vous souhaitez interroger. Pour obtenir des instructions, voir Identificateurs de compte. Vous devez spécifier cet identificateur lorsque vous connectez Spark à des tables Iceberg avec des politiques d’accès aux données appliquées.

    Astuce

    Pour obtenir votre identificateur de compte en utilisant SQL, exécutez la commande suivante :

    SELECT CURRENT_ORGANIZATION_NAME() || '-' || CURRENT_ACCOUNT_NAME();
    
    Copy

Étape 1 : configurer des politiques de protection des données

Important

Si vous avez déjà défini des politiques de protection des données sur les tables Iceberg que vous souhaitez interroger, passez à l’étape suivante.

Dans cette étape, vous configurez des politiques de protection des données.

Étape 2 : connecter Spark avec Snowflake Connector pour Spark aux tables Iceberg

Dans cette étape, vous connectez Spark aux tables Iceberg via Horizon Catalog. Avec cette connexion, vous pouvez interroger les tables en utilisant Spark avec les politiques de protection des données appliquées aux tables.

Pour connecter Spark avec Snowflake Connector pour Spark (connecteur Spark) aux tables Iceberg, vous téléchargez d’abord le connecteur Spark, puis vous connectez Spark aux tables Iceberg.

Téléchargement de Snowflake Connector pour Spark

Pour télécharger 3.1.6 ou une version ultérieure de Snowflake Connector pour Spark, suivez les instructions dans Installation et configuration du connecteur Spark.

Connecter Spark aux tables Iceberg

Dans cette étape, vous connectez Spark aux tables Iceberg via Horizon Catalog. Cette connexion inclut des configurations pour que vous puissiez utiliser Snowflake Connector pour Spark avec le catalogue Horizon pour interroger les tables qui sont protégées par des politiques de protection des données Snowflake.

Note

Si vous utilisez OAuth externe ou l’authentification par paire de clés, voir Connecter Spark aux tables Iceberg via OAuth externe ou l’authentification par paire de clés.

  • Pour connecter Spark à des tables Iceberg en utilisant un jeton d’accès programmatique (PAT), utilisez le code PySpark exemple suivant :

    from pyspark.sql import SparkSession
    
    # Snowflake Horizon Catalog Configuration, change as per your environment
    
    CATALOG_URI = "https://<account_identifier>.snowflakecomputing.com/polaris/api/catalog"
    ROLE = "<role>"
    HORIZON_SESSION_ROLE = f"session:role:{ROLE}"
    CATALOG_NAME = "<database_name>" #provide in UPPER CASE
    SF_URL= "<account_identifier>.snowflakecomputing.com"
    SF_USER = "<user_name>" #provide in UPPER CASE
    SF_PASSWORD = "<user_password>"
    SF_SCHEMA = "<schema_name>" #provide in UPPER CASE
    SF_WAREHOUSE = "<warehouse_name>" #provide in UPPER CASE
    
    
    # Cloud Service Provider Region Configuration (where the Iceberg data is stored)
    REGION = "<region_name>"
    
    # Paste the External Oauth Access token that you generated in Snowflake here
    ACCESS_TOKEN = "<your_access_token>"
    
    # Paste the PAT you generated in Snowflake here
    PAT_TOKEN = "<your_PAT_token>"
    
    # Iceberg Version
    ICEBERG_VERSION = "1.9.1"
    
    #Snowflake Connector for Spark
    DRIVER_VERSION = "3.24.0" # (or above)
    SNOWFLAKE_CONNECTOR_VERSION = "3.1.6"
    
    
    try:
        spark.stop()
    except:
        pass
    
      spark = (
          SparkSession.builder
    
          .master("local[*]")
    .config("spark.ui.port", "0")
          .config("spark.driver.bindAddress", "127.0.0.1")
          .config("spark.driver.host", "127.0.0.1")
          .config("spark.driver.port", "0")
          .config("spark.blockManager.port", "0")
    
    
    # JAR Dependencies for Iceberg, Azure and Snowflake Connector for Spark
          .config(
     "spark.jars.packages",
     f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{ICEBERG_VERSION},"
     f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION},"
    
       # for Azure storage, use the below package and comment above azure bundle
              # f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}"
    # for Snowflake Connector for Spark
     f"net.snowflake:snowflake-jdbc:{DRIVER_VERSION},"
     f"net.snowflake:spark-snowflake_2.12:{SNOWFLAKE_CONNECTOR_VERSION}"
    
    )
          # Iceberg SQL Extensions
          .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
          .config("spark.sql.defaultCatalog", "horizoncatalog")
    .config("spark.sql.catalog.horizoncatalog", "org.apache.spark.sql.snowflake.catalog.SnowflakeFallbackCatalog")
    
      #Horizon REST Catalog Configuration
       .config(f"spark.sql.catalog.horizoncatalog.catalog-impl", "org.apache.iceberg.spark.SparkCatalog")
          .config(f"spark.sql.catalog.horizoncatalog.type", "rest")
          .config(f"spark.sql.catalog.horizoncatalog.uri", CATALOG_URI)
          .config(f"spark.sql.catalog.horizoncatalog.warehouse", CATALOG_NAME)
          .config(f"spark.sql.catalog.horizoncatalog.scope", HORIZON_SESSION_ROLE)
          .config(f"spark.sql.catalog.horizoncatalog.client.region", REGION)
          .config(f"spark.sql.catalog.horizoncatalog.credential", PAT_TOKEN)
    # for External Oauth use below and comment above configuration .token
    #.config(f"spark.sql.catalog.horizoncatalog.token", ACCESS_TOKEN)
    
    .config("spark.sql.catalog.horizoncatalog.io-impl","org.apache.iceberg.aws.s3.S3FileIO")
    # Enforcing policies using Snowflake Connector for Spark
    .config("spark.snowflake.sfURL", SF_URL)
    .config("spark.snowflake.sfUser", SF_USER)
    .config("spark.snowflake.sfPassword", SF_PASSWORD)
    # for External Oauth uncomment below and comment above configurations for user and password
    #.config("spark.snowflake.sfAuthenticator","oauth")
    #.config("spark.snowflake.sfToken",ACCESS_TOKEN)
    .config("spark.snowflake.sfDatabase", CATALOG_NAME)
    .config("spark.snowflake.sfSchema",SF_SCHEMA) # Optional
    .config("spark.snowflake.sfRole",ROLE)
    .config("spark.snowflake.sfWarehouse",SF_WAREHOUSE)
    
      # Required for vended credentials
     .config(f"spark.sql.catalog.horizoncatalog.header.X-Iceberg-Access-Delegation", "vended-credentials")
          .config("spark.sql.iceberg.vectorization.enabled", "false")
          .getOrCreate()
      )
      spark.sparkContext.setLogLevel("ERROR")
    
    Copy

    Où :

    • <account_identifier> est votre identificateur de compte Snowflake pour le compte Snowflake qui contient les tables Iceberg que vous souhaitez interroger. Pour trouver cet identificateur, voir Identificateurs de compte.

    • <your_access_token> est le jeton d’accès que vous avez obtenu. Pour obtenir un jeton d’accès, voir Obtenir un jeton d’accès pour l’authentification.

      Note

      Pour OAuth externe, vous pouvez également configurer votre connexion au moteur avec l’actualisation automatique du jeton au lieu de spécifier un jeton d’accès.

    • <database_name> est le nom de la base de données de votre compte Snowflake qui contient les tables Iceberg gérées par Snowflake que vous souhaitez interroger.

      Note

      Les propriétés suivantes dans Spark attendent le nom de votre base de données Snowflake, et non le nom de votre entrepôt Snowflake :

      • .warehouse

      • .sfDatabase

    • <role> est le rôle dans Snowflake qui est configuré avec accès aux tables Iceberg que vous souhaitez interroger. Par exemple : DATA_ENGINEER.

    • <user_name> est le nom d’utilisateur utilisé pour accéder aux tables dans Snowflake.

    • <user_password> est le mot de passe de l’utilisateur accédant aux tables.

      Note

      Ce mot de passe peut être le jeton d’accès programmatique (PAT) que vous avez obtenu pour l’authentification, le cas échéant.

    • <schema_name> est le schéma dans Snowflake où les tables sont stockées. Cela est facultatif.

    • <warehouse_name> est le nom de l’entrepôt Snowflake (instance de calcul) que vous souhaitez utiliser pour évaluer les politiques.

    Important

    Par défaut, l’exemple de code est configuré pour les tables Apache Iceberg™ stockées sur Amazon S3. Si vos tables Iceberg sont stockées sur Azure Storage (ADLS), procédez comme suit :

    1. Commentez la ligne suivante : f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}"

    2. Ne commentez pas la ligne suivante : # f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}"

Connecter Spark aux tables Iceberg via OAuth externe ou l’authentification par paire de clés

L’exemple de code précédent montre la configuration pour se connecter en utilisant un jeton d’accès programmatique (PAT).

Pour connecter Spark aux tables Iceberg en utilisant OAuth externe ou l’authentification par paire de clés, suivez ces étapes pour modifier l’exemple de code précédent :

  1. Pour <your_access_token>, spécifiez votre jeton d’accès pour OAuth externe ou l’authentification par paire de clés.

    Pour obtenir un jeton d’accès, voir Étape 3 : Obtenir un jeton d’accès pour l’authentification.

  2. Commentez la ligne suivante : .config(f"spark.sql.catalog.{CATALOG_NAME}.credential", PAT_TOKEN)

  3. Ne commentez pas la ligne suivante : #.config(f"spark.sql.catalog.{CATALOG_NAME}.token", ACCESS_TOKEN)

Étape 3 : interroger des tables Iceberg à l’aide de Spark

Utilisez Spark pour lire les tables Iceberg qui sont protégées par les politiques de protection des données Snowflake. Spark peut acheminer automatiquement les requêtes des tables protégées par des politiques Snowflake via Snowflake pour garantir une application cohérente.

Interroger une table

spark.sql("SHOW NAMESPACES").show(truncate=False)
spark.sql("USE horizoncatalog.<schema_name>")
spark.sql("SHOW TABLES").show(truncate=False)
spark.sql("Select * from <your_table_name_in_snowflake>").show(truncate=False)
Copy

Surveiller une requête pour l’évaluation des politiques

Vous pouvez surveiller l’activité des requêtes dans Snowflake pour les requêtes qui sont acheminées de Spark vers Snowflake pour l’évaluation des politiques dans votre compte Snowflake.

Considérations relatives à la configuration de politiques de protection des données

Tenez compte des éléments suivants lorsque vous configurez des politiques de protection des données :

  • L’application de politiques de protection des données sur les tables Iceberg que vous interrogez à l’aide de Spark n’est prise en charge que lorsque les politiques de protection des données suivantes sont définies sur les tables :

    • Politiques de masquage

    • Politiques de masquage basées sur les balises

    • Politiques d’accès aux lignes

    Les requêtes sur des tables qui sont protégées par toutes les autres politiques entraîneront une erreur.