Renforcer les politiques de protection des données lors de l’interrogation de tables Apache Iceberg™ Apache Spark™¶
Cette rubrique décrit comment appliquer des politiques de protection des données définies sur les tables Apache Iceberg™ lors de l’accès via Apache Spark™ via Snowflake Horizon Catalog. Pour appliquer des politiques de protection des données, vous installez Snowflake Connector pour Spark, le connecteur Spark. Pour plus d’informations sur le connecteur Spark, consultez Connecteur Snowflake pour Spark.
Le connecteur Spark prend en charge l’interrogation de tables protégées par des politiques Snowflake en acheminant la requête via Snowflake, ce qui garantit une utilisation efficace du calcul et une application cohérente. Le connecteur Spark prend également en charge l’exécution d’opérations d’écriture sur des tables protégées par des politiques Snowflake en acheminant l’écriture via Snowflake.
Note
Le connecteur Spark prend également en charge l’interrogation directe des tables Apache Iceberg sans politiques de protection des données détaillées via le calcul de session Spark via le catalogue Horizon Snowflake.
Flux de travail pour appliquer des politiques de protection des données lors de l’interrogation de tables Iceberg à partir de Spark¶
Pour appliquer des politiques de protection des données lors de l’interrogation de tables Iceberg à partir de Spark, procédez comme suit :
Connectez Spark avec le connecteur Snowflake Spark à des tables Iceberg, qui inclut le téléchargement de Snowflake Connector pour Spark et la connexion de Spark aux tables Iceberg via le catalogue Snowflake Horizon.
Politiques de protection des données prises en charge¶
Les politiques de protection des données suivantes sont prises en charge :
Les requêtes sur des tables qui sont protégées par une autre politique de données entraînent une erreur.
Conditions préalables¶
Récupérez les informations suivantes :
Le nom d’utilisateur de l’utilisateur Snowflake qui interrogera les tables
Le nom de la base de données Snowflake qui contient les tables que vous souhaitez interroger.
Le nom de l’entrepôt virtuel dans Snowflake à utiliser pour l’évaluation des politiques
Récupérez l’identificateur de compte de votre compte Snowflake contenant les tables Iceberg que vous souhaitez interroger. Pour obtenir des instructions, voir Identificateurs de compte. Vous devez spécifier cet identificateur lorsque vous connectez Spark à des tables Iceberg avec des politiques d’accès aux données appliquées.
Astuce
Pour obtenir votre identificateur de compte en utilisant SQL, exécutez la commande suivante :
SELECT CURRENT_ORGANIZATION_NAME() || '-' || CURRENT_ACCOUNT_NAME();
Étape 1 : configurer des politiques de protection des données¶
Important
Si vous avez déjà défini des politiques de protection des données sur les tables Iceberg que vous souhaitez interroger, passez à l’étape suivante.
Dans cette étape, vous configurez des politiques de protection des données.
Pour configurer des politiques de protection des données, définissez des politiques d’accès aux données sur les tables Iceberg que vous souhaitez interroger les tables :
Pour attribuer des politiques de masquage, voir Comprendre le masquage dynamique des données.
Pour attribuer des politiques de masquage basées sur des balises, voir Politiques de masquage basées sur les balises.
Pour attribuer des politiques d’accès aux lignes, voir Présentation des politiques d’accès aux lignes.
Étape 2 : connecter Spark avec Snowflake Connector pour Spark aux tables Iceberg¶
Dans cette étape, vous connectez Spark aux tables Iceberg via Horizon Catalog. Avec cette connexion, vous pouvez interroger les tables en utilisant Spark avec les politiques de protection des données appliquées aux tables.
Pour connecter Spark avec Snowflake Connector pour Spark (connecteur Spark) aux tables Iceberg, vous téléchargez d’abord le connecteur Spark, puis vous connectez Spark aux tables Iceberg.
Téléchargement de Snowflake Connector pour Spark¶
Pour télécharger 3.1.6 ou une version ultérieure de Snowflake Connector pour Spark, suivez les instructions dans Installation et configuration du connecteur Spark.
Connecter Spark aux tables Iceberg¶
Dans cette étape, vous connectez Spark aux tables Iceberg via Horizon Catalog. Cette connexion inclut des configurations pour que vous puissiez utiliser Snowflake Connector pour Spark avec le catalogue Horizon pour interroger les tables qui sont protégées par des politiques de protection des données Snowflake.
Note
Si vous utilisez OAuth externe ou l’authentification par paire de clés, voir Connecter Spark aux tables Iceberg via OAuth externe ou l’authentification par paire de clés.
Pour connecter Spark à des tables Iceberg en utilisant un jeton d’accès programmatique (PAT), utilisez le code PySpark exemple suivant :
from pyspark.sql import SparkSession # Snowflake Horizon Catalog Configuration, change as per your environment CATALOG_URI = "https://<account_identifier>.snowflakecomputing.com/polaris/api/catalog" ROLE = "<role>" HORIZON_SESSION_ROLE = f"session:role:{ROLE}" CATALOG_NAME = "<database_name>" #provide in UPPER CASE SF_URL= "<account_identifier>.snowflakecomputing.com" SF_USER = "<user_name>" #provide in UPPER CASE SF_PASSWORD = "<user_password>" SF_SCHEMA = "<schema_name>" #provide in UPPER CASE SF_WAREHOUSE = "<warehouse_name>" #provide in UPPER CASE # Cloud Service Provider Region Configuration (where the Iceberg data is stored) REGION = "<region_name>" # Paste the External Oauth Access token that you generated in Snowflake here ACCESS_TOKEN = "<your_access_token>" # Paste the PAT you generated in Snowflake here PAT_TOKEN = "<your_PAT_token>" # Iceberg Version ICEBERG_VERSION = "1.9.1" #Snowflake Connector for Spark DRIVER_VERSION = "3.24.0" # (or above) SNOWFLAKE_CONNECTOR_VERSION = "3.1.6" try: spark.stop() except: pass spark = ( SparkSession.builder .master("local[*]") .config("spark.ui.port", "0") .config("spark.driver.bindAddress", "127.0.0.1") .config("spark.driver.host", "127.0.0.1") .config("spark.driver.port", "0") .config("spark.blockManager.port", "0") # JAR Dependencies for Iceberg, Azure and Snowflake Connector for Spark .config( "spark.jars.packages", f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{ICEBERG_VERSION}," f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}," # for Azure storage, use the below package and comment above azure bundle # f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}" # for Snowflake Connector for Spark f"net.snowflake:snowflake-jdbc:{DRIVER_VERSION}," f"net.snowflake:spark-snowflake_2.12:{SNOWFLAKE_CONNECTOR_VERSION}" ) # Iceberg SQL Extensions .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.defaultCatalog", "horizoncatalog") .config("spark.sql.catalog.horizoncatalog", "org.apache.spark.sql.snowflake.catalog.SnowflakeFallbackCatalog") #Horizon REST Catalog Configuration .config(f"spark.sql.catalog.horizoncatalog.catalog-impl", "org.apache.iceberg.spark.SparkCatalog") .config(f"spark.sql.catalog.horizoncatalog.type", "rest") .config(f"spark.sql.catalog.horizoncatalog.uri", CATALOG_URI) .config(f"spark.sql.catalog.horizoncatalog.warehouse", CATALOG_NAME) .config(f"spark.sql.catalog.horizoncatalog.scope", HORIZON_SESSION_ROLE) .config(f"spark.sql.catalog.horizoncatalog.client.region", REGION) .config(f"spark.sql.catalog.horizoncatalog.credential", PAT_TOKEN) # for External Oauth use below and comment above configuration .token #.config(f"spark.sql.catalog.horizoncatalog.token", ACCESS_TOKEN) .config("spark.sql.catalog.horizoncatalog.io-impl","org.apache.iceberg.aws.s3.S3FileIO") # Enforcing policies using Snowflake Connector for Spark .config("spark.snowflake.sfURL", SF_URL) .config("spark.snowflake.sfUser", SF_USER) .config("spark.snowflake.sfPassword", SF_PASSWORD) # for External Oauth uncomment below and comment above configurations for user and password #.config("spark.snowflake.sfAuthenticator","oauth") #.config("spark.snowflake.sfToken",ACCESS_TOKEN) .config("spark.snowflake.sfDatabase", CATALOG_NAME) .config("spark.snowflake.sfSchema",SF_SCHEMA) # Optional .config("spark.snowflake.sfRole",ROLE) .config("spark.snowflake.sfWarehouse",SF_WAREHOUSE) # Required for vended credentials .config(f"spark.sql.catalog.horizoncatalog.header.X-Iceberg-Access-Delegation", "vended-credentials") .config("spark.sql.iceberg.vectorization.enabled", "false") .getOrCreate() ) spark.sparkContext.setLogLevel("ERROR")
Où :
<account_identifier>est votre identificateur de compte Snowflake pour le compte Snowflake qui contient les tables Iceberg que vous souhaitez interroger. Pour trouver cet identificateur, voir Identificateurs de compte.<your_access_token>est le jeton d’accès que vous avez obtenu. Pour obtenir un jeton d’accès, voir Obtenir un jeton d’accès pour l’authentification.Note
Pour OAuth externe, vous pouvez également configurer votre connexion au moteur avec l’actualisation automatique du jeton au lieu de spécifier un jeton d’accès.
<database_name>est le nom de la base de données de votre compte Snowflake qui contient les tables Iceberg gérées par Snowflake que vous souhaitez interroger.Note
Les propriétés suivantes dans Spark attendent le nom de votre base de données Snowflake, et non le nom de votre entrepôt Snowflake :
.warehouse.sfDatabase
<role>est le rôle dans Snowflake qui est configuré avec accès aux tables Iceberg que vous souhaitez interroger. Par exemple : DATA_ENGINEER.<user_name>est le nom d’utilisateur utilisé pour accéder aux tables dans Snowflake.<user_password>est le mot de passe de l’utilisateur accédant aux tables.Note
Ce mot de passe peut être le jeton d’accès programmatique (PAT) que vous avez obtenu pour l’authentification, le cas échéant.
<schema_name>est le schéma dans Snowflake où les tables sont stockées. Cela est facultatif.<warehouse_name>est le nom de l’entrepôt Snowflake (instance de calcul) que vous souhaitez utiliser pour évaluer les politiques.
Important
Par défaut, l’exemple de code est configuré pour les tables Apache Iceberg™ stockées sur Amazon S3. Si vos tables Iceberg sont stockées sur Azure Storage (ADLS), procédez comme suit :
Commentez la ligne suivante :
f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}"Ne commentez pas la ligne suivante :
# f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}"
Connecter Spark aux tables Iceberg via OAuth externe ou l’authentification par paire de clés¶
L’exemple de code précédent montre la configuration pour se connecter en utilisant un jeton d’accès programmatique (PAT).
Pour connecter Spark aux tables Iceberg en utilisant OAuth externe ou l’authentification par paire de clés, suivez ces étapes pour modifier l’exemple de code précédent :
Pour
<your_access_token>, spécifiez votre jeton d’accès pour OAuth externe ou l’authentification par paire de clés.Pour obtenir un jeton d’accès, voir Étape 3 : Obtenir un jeton d’accès pour l’authentification.
Commentez la ligne suivante :
.config(f"spark.sql.catalog.{CATALOG_NAME}.credential", PAT_TOKEN)Ne commentez pas la ligne suivante :
#.config(f"spark.sql.catalog.{CATALOG_NAME}.token", ACCESS_TOKEN)
Étape 3 : interroger des tables Iceberg à l’aide de Spark¶
Utilisez Spark pour lire les tables Iceberg qui sont protégées par les politiques de protection des données Snowflake. Spark peut acheminer automatiquement les requêtes des tables protégées par des politiques Snowflake via Snowflake pour garantir une application cohérente.
Interroger une table¶
spark.sql("SHOW NAMESPACES").show(truncate=False)
spark.sql("USE horizoncatalog.<schema_name>")
spark.sql("SHOW TABLES").show(truncate=False)
spark.sql("Select * from <your_table_name_in_snowflake>").show(truncate=False)
Surveiller une requête pour l’évaluation des politiques¶
Vous pouvez surveiller l’activité des requêtes dans Snowflake pour les requêtes qui sont acheminées de Spark vers Snowflake pour l’évaluation des politiques dans votre compte Snowflake.
Pour surveiller l’historique des requêtes dans Snowflake, suivez les instructions dans Surveillance de l’activité des requêtes avec l’historique des requêtes.
Considérations relatives à la configuration de politiques de protection des données¶
Tenez compte des éléments suivants lorsque vous configurez des politiques de protection des données :
L’application de politiques de protection des données sur les tables Iceberg que vous interrogez à l’aide de Spark n’est prise en charge que lorsque les politiques de protection des données suivantes sont définies sur les tables :
Politiques de masquage
Politiques de masquage basées sur les balises
Politiques d’accès aux lignes
Les requêtes sur des tables qui sont protégées par toutes les autres politiques entraîneront une erreur.