Aplicar políticas de proteção de dados ao consultar tabelas Apache Iceberg™ pelo Apache Spark™

Este tópico descreve como aplicar políticas de proteção de dados definidas em tabelas Apache Iceberg™ quando acessadas pelo Apache Spark™ por meio do Snowflake Horizon Catalog. Para aplicar políticas de proteção de dados, instale o conector Snowflake para Spark: conector Spark. Para obter mais informações sobre o conector Spark, consulte Conector Snowflake para Spark.

O conector Spark oferece suporte à consulta de tabelas protegidas pelas políticas do Snowflake roteando a consulta por meio do Snowflake, o que garante o uso eficiente da computação e uma aplicação consistente. O conector Spark também oferece suporte a operações de gravação em tabelas protegidas pelas políticas do Snowflake roteando a gravação por meio do Snowflake.

Nota

O conector Spark também oferece suporte à consulta direta de tabelas Apache Iceberg sem políticas de proteção de dados refinadas usando a computação de sessão do Spark por meio do Snowflake Horizon Catalog.

Fluxo de trabalho para aplicar políticas de proteção de dados ao consultar tabelas Iceberg pelo Spark

Para aplicar políticas de proteção de dados ao consultar tabelas Iceberg pelo Spark, conclua as seguintes etapas:

  1. Configurar políticas de proteção de dados.

  2. Conectar o Spark com o conector Snowflake Spark a tabelas Iceberg, o que inclui o download do conector Snowflake para Spark e a conexão do Spark a tabelas Iceberg por meio do Snowflake Horizon Catalog.

  3. Consultar tabelas Iceberg.

Políticas de proteção de dados compatíveis

As seguintes políticas de proteção de dados são compatíveis:

Consultas em tabelas protegidas por qualquer outra política de dados resultam em um erro.

Pré-requisitos

  • Recupere as seguintes informações:

    • O nome de usuário do Snowflake que consultará as tabelas

    • O nome do banco de dados Snowflake com as tabelas que você deseja consultar

    • O nome do warehouse virtual no Snowflake a ser usado para avaliação das políticas

  • Recupere o identificador da conta Snowflake que contém as tabelas Iceberg que você deseja consultar. Para obter instruções, consulte Identificadores de conta. Você especifica esse identificador ao conectar o Spark a tabelas Iceberg com políticas de acesso a dados aplicadas.

    Dica

    Para obter o identificador da sua conta usando SQL, execute o seguinte comando:

    SELECT CURRENT_ORGANIZATION_NAME() || '-' || CURRENT_ACCOUNT_NAME();
    
    Copy

Etapa 1: Configurar políticas de proteção de dados

Importante

Se você já definiu políticas de proteção de dados nas tabelas Iceberg que deseja consultar, prossiga para a próxima etapa.

Nesta etapa, você configura as políticas de proteção de dados.

Etapa 2: Conectar o Spark com o conector Snowflake para Spark a tabelas Iceberg

Nesta etapa, você conecta o Spark às tabelas Iceberg por meio do Horizon Catalog. Com essa conexão, é possível consultar as tabelas usando o Spark com as políticas de proteção de dados aplicadas às tabelas.

Para conectar o Spark com o conector Snowflake para Spark (conector Spark) a tabelas Iceberg, primeiro baixe o conector Spark e depois o conecte o Spark às tabelas Iceberg.

Baixar o conector Snowflake para Spark

Para baixar a versão 3.1.6 ou uma versão posterior do conector Snowflake para Spark, siga as instruções em Instalação e configuração do conector Spark.

Conectar o Spark a tabelas Iceberg

Nesta etapa, você conecta o Spark às tabelas Iceberg por meio do Horizon Catalog. Essa conexão inclui configurações para você usar o conector Snowflake para Spark com o Horizon Catalog para consultar as tabelas protegidas pelas políticas de proteção de dados do Snowflake.

Nota

Se você usa uma autenticação do OAuth externo ou de par de chaves, consulte Conectar o Spark a tabelas Iceberg usando autenticação do OAuth externo ou de par de chaves.

  • Para conectar o Spark às tabelas Iceberg usando um token de acesso programático (Programmatic Access Token, PAT), use o seguinte código PySpark de exemplo:

    from pyspark.sql import SparkSession
    
    # Snowflake Horizon Catalog Configuration, change as per your environment
    
    CATALOG_URI = "https://<account_identifier>.snowflakecomputing.com/polaris/api/catalog"
    ROLE = "<role>"
    HORIZON_SESSION_ROLE = f"session:role:{ROLE}"
    CATALOG_NAME = "<database_name>" #provide in UPPER CASE
    SF_URL= "<account_identifier>.snowflakecomputing.com"
    SF_USER = "<user_name>" #provide in UPPER CASE
    SF_PASSWORD = "<user_password>"
    SF_SCHEMA = "<schema_name>" #provide in UPPER CASE
    SF_WAREHOUSE = "<warehouse_name>" #provide in UPPER CASE
    
    
    # Cloud Service Provider Region Configuration (where the Iceberg data is stored)
    REGION = "<region_name>"
    
    # Paste the External Oauth Access token that you generated in Snowflake here
    ACCESS_TOKEN = "<your_access_token>"
    
    # Paste the PAT you generated in Snowflake here
    PAT_TOKEN = "<your_PAT_token>"
    
    # Iceberg Version
    ICEBERG_VERSION = "1.9.1"
    
    #Snowflake Connector for Spark
    DRIVER_VERSION = "3.24.0" # (or above)
    SNOWFLAKE_CONNECTOR_VERSION = "3.1.6"
    
    
    try:
        spark.stop()
    except:
        pass
    
      spark = (
          SparkSession.builder
    
          .master("local[*]")
    .config("spark.ui.port", "0")
          .config("spark.driver.bindAddress", "127.0.0.1")
          .config("spark.driver.host", "127.0.0.1")
          .config("spark.driver.port", "0")
          .config("spark.blockManager.port", "0")
    
    
    # JAR Dependencies for Iceberg, Azure and Snowflake Connector for Spark
          .config(
     "spark.jars.packages",
     f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{ICEBERG_VERSION},"
     f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION},"
    
       # for Azure storage, use the below package and comment above azure bundle
              # f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}"
    # for Snowflake Connector for Spark
     f"net.snowflake:snowflake-jdbc:{DRIVER_VERSION},"
     f"net.snowflake:spark-snowflake_2.12:{SNOWFLAKE_CONNECTOR_VERSION}"
    
    )
          # Iceberg SQL Extensions
          .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
          .config("spark.sql.defaultCatalog", "horizoncatalog")
    .config("spark.sql.catalog.horizoncatalog", "org.apache.spark.sql.snowflake.catalog.SnowflakeFallbackCatalog")
    
      #Horizon REST Catalog Configuration
       .config(f"spark.sql.catalog.horizoncatalog.catalog-impl", "org.apache.iceberg.spark.SparkCatalog")
          .config(f"spark.sql.catalog.horizoncatalog.type", "rest")
          .config(f"spark.sql.catalog.horizoncatalog.uri", CATALOG_URI)
          .config(f"spark.sql.catalog.horizoncatalog.warehouse", CATALOG_NAME)
          .config(f"spark.sql.catalog.horizoncatalog.scope", HORIZON_SESSION_ROLE)
          .config(f"spark.sql.catalog.horizoncatalog.client.region", REGION)
          .config(f"spark.sql.catalog.horizoncatalog.credential", PAT_TOKEN)
    # for External Oauth use below and comment above configuration .token
    #.config(f"spark.sql.catalog.horizoncatalog.token", ACCESS_TOKEN)
    
    .config("spark.sql.catalog.horizoncatalog.io-impl","org.apache.iceberg.aws.s3.S3FileIO")
    # Enforcing policies using Snowflake Connector for Spark
    .config("spark.snowflake.sfURL", SF_URL)
    .config("spark.snowflake.sfUser", SF_USER)
    .config("spark.snowflake.sfPassword", SF_PASSWORD)
    # for External Oauth uncomment below and comment above configurations for user and password
    #.config("spark.snowflake.sfAuthenticator","oauth")
    #.config("spark.snowflake.sfToken",ACCESS_TOKEN)
    .config("spark.snowflake.sfDatabase", CATALOG_NAME)
    .config("spark.snowflake.sfSchema",SF_SCHEMA) # Optional
    .config("spark.snowflake.sfRole",ROLE)
    .config("spark.snowflake.sfWarehouse",SF_WAREHOUSE)
    
      # Required for vended credentials
     .config(f"spark.sql.catalog.horizoncatalog.header.X-Iceberg-Access-Delegation", "vended-credentials")
          .config("spark.sql.iceberg.vectorization.enabled", "false")
          .getOrCreate()
      )
      spark.sparkContext.setLogLevel("ERROR")
    
    Copy

    Onde:

    • <account_identifier> é o identificador da sua conta Snowflake que contém as tabelas Iceberg que você deseja consultar. Para encontrar esse identificador, consulte Identificadores de conta.

    • <your_access_token> é o seu token de acesso que você obteve. Para obter um token de acesso, consulte Obter um token de acesso para autenticação.

      Nota

      Para o OAuth externo, como alternativa, você pode configurar sua conexão com o mecanismo com atualização automática de token em vez de especificar um token de acesso.

    • <database_name> é o nome do banco de dados em sua conta Snowflake que contém as tabelas Iceberg gerenciadas pelo Snowflake que você deseja consultar.

      Nota

      As seguintes propriedades no Spark esperam o nome do seu banco de dados Snowflake, não o nome do seu warehouse Snowflake:

      • .warehouse

      • .sfDatabase

    • <role> é a função no Snowflake que está configurada com acesso às tabelas Iceberg que você deseja consultar. Por exemplo: DATA_ENGINEER.

    • <user_name> é o nome de usuário usado para acessar tabelas no Snowflake.

    • <user_password> é a senha do usuário que acessa as tabelas.

      Nota

      Essa senha pode ser o token de acesso programático (PAT) que você obteve para autenticação, se aplicável.

    • <schema_name> é o esquema no Snowflake em que as tabelas são armazenadas. Ele é opcional.

    • <warehouse_name> é o nome do warehouse Snowflake (instância de computação) que você deseja usar para avaliar as políticas.

    Importante

    Por padrão, o exemplo de código está configurado para tabelas Apache Iceberg™ armazenadas no Amazon S3. Se suas tabelas Iceberg estiverem armazenadas no Armazenamento do Azure (ADLS), execute as seguintes etapas:

    1. Comente a seguinte linha: f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}"

    2. Remova o comentário da seguinte linha: # f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}"

Conectar o Spark a tabelas Iceberg usando autenticação do OAuth externo ou de par de chaves

O exemplo de código anterior mostra a configuração para conexão usando um token de acesso programático (PAT).

Para conectar o Spark às tabelas Iceberg usando autenticação do OAuth externo ou de par de chaves, siga estas etapas para alterar o exemplo de código anterior:

  1. Para <your_access_token>, especifique seu token de acesso para autenticação do OAuth externo ou de par de chaves.

    Para obter um token de acesso, consulte Etapa 3: Obter um token de acesso para autenticação.

  2. Comente a seguinte linha: .config(f"spark.sql.catalog.{CATALOG_NAME}.credential", PAT_TOKEN)

  3. Remova o comentário da seguinte linha: #.config(f"spark.sql.catalog.{CATALOG_NAME}.token", ACCESS_TOKEN)

Etapa 3: Consultar tabelas Iceberg usando o Spark

Use o Spark para ler tabelas Iceberg protegidas pelas políticas de proteção de dados do Snowflake. O Spark pode rotear automaticamente consultas de tabelas protegidas pelas políticas do Snowflake por meio do Snowflake para garantir uma aplicação consistente.

Consulta a uma tabela

spark.sql("SHOW NAMESPACES").show(truncate=False)
spark.sql("USE horizoncatalog.<schema_name>")
spark.sql("SHOW TABLES").show(truncate=False)
spark.sql("Select * from <your_table_name_in_snowflake>").show(truncate=False)
Copy

Monitorar uma consulta para avaliação de políticas

Em sua conta Snowflake, você pode monitorar a atividade de consulta no Snowflake para consultas que são roteadas do Spark ao Snowflake para avaliação de política.

Considerações para configurar políticas de proteção de dados

Considere os itens a seguir ao configurar políticas de proteção de dados:

  • A aplicação de políticas de proteção de dados a tabelas Iceberg que você consulta usando o Spark só é permitida quando as seguintes políticas de proteção de dados estão definidas nas tabelas:

    • Políticas de mascaramento

    • Políticas de mascaramento baseadas em tags

    • Políticas de acesso a linhas

    Consultas em tabelas protegidas por todas as outras políticas resultam em um erro.