Apache Spark™에서 Apache Iceberg™ 테이블 쿼리 시 데이터 보호 정책 적용¶
이 항목에서는 Snowflake Horizon Catalog를 통해 Apache Spark™로 액세스할 때 Apache Iceberg™ 테이블에 설정된 데이터 보호 정책을 적용하는 방법을 설명합니다. 데이터 보호 정책을 적용하려면 Spark용 Snowflake 커넥터인 *Spark 커넥터*를 설치합니다. Spark 커넥터에 대한 자세한 내용은 Spark용 Snowflake 커넥터 섹션을 참조하세요.
Spark 커넥터는 컴퓨팅의 효율적인 사용과 일관된 적용을 보장하는 Snowflake를 통해 쿼리를 라우팅하여 Snowflake 정책으로 보호되는 테이블 쿼리를 지원합니다. Spark 커넥터는 Snowflake를 통해 쓰기를 라우팅하여 Snowflake 정책으로 보호되는 테이블에 대한 쓰기 작업 수행도 지원합니다.
참고
또한 Spark 커넥터는 Snowflake Horizon Catalog를 통해 Spark 세션 컴퓨팅을 사용하여, 세분화된 데이터 보호 정책 없이 Apache Iceberg 테이블을 직접 쿼리하는 기능도 지원합니다.
Spark에서 Iceberg 테이블을 쿼리할 때 데이터 보호 정책을 적용하는 워크플로¶
Spark에서 Iceberg 테이블을 쿼리할 때 데이터 보호 정책을 적용하려면 다음 단계를 완료합니다.
:ref:`데이터 보호 정책을 구성<label-enforce_access_policies_configure_policies>`합니다.
:ref:`Snowflake Spark Connector를 사용하여 Spark를 Iceberg 테이블에 연결<label-connect_spark_enforce_access_policies_connect_spark>`합니다. 여기에는 Spark용 Snowflake Connector 다운로드 및 Snowflake Horizon Catalog를 통해 Spark를 Iceberg 테이블에 연결하는 작업이 포함됩니다.
:ref:`Iceberg 테이블을 쿼리<label-connect_spark_enforce_access_policies_query_tables>`합니다.
지원되는 데이터 보호 정책¶
다음 데이터 보호 정책이 지원됩니다.
다른 데이터 정책으로 보호되는 테이블에 대한 쿼리는 오류를 발생시킵니다.
전제 조건¶
다음 정보를 검색합니다.
테이블을 쿼리할 Snowflake 사용자의 사용자 이름
쿼리하려는 테이블이 포함된 Snowflake 데이터베이스의 이름
정책 평가에 사용할 Snowflake의 가상 웨어하우스 이름
쿼리하려는 Iceberg 테이블이 포함된 Snowflake 계정의 계정 식별자를 검색합니다. 자세한 지침은 계정 식별자 섹션을 참조하십시오. :ref:`데이터 액세스 정책이 적용된 Iceberg 테이블에 Spark를 연결<label-connect_spark_enforce_access_policies_connect_spark>`할 때 이 식별자를 지정합니다.
팁
SQL을 사용하여 계정 식별자를 가져오려면 다음 명령을 실행합니다.
SELECT CURRENT_ORGANIZATION_NAME() || '-' || CURRENT_ACCOUNT_NAME();
1단계: 데이터 보호 정책 구성¶
중요
쿼리하려는 Iceberg 테이블에 데이터 보호 정책을 이미 설정한 경우 다음 단계로 진행합니다.
이 단계에서는 데이터 보호 정책을 구성합니다.
데이터 보호 정책을 구성하려면 쿼리하려는 Iceberg 테이블에 대한 데이터 액세스 정책을 설정합니다.
마스킹 정책을 할당하려면 Dynamic Data Masking 이해하기 섹션을 참조하세요.
태그 기반 마스킹 정책을 할당하려면 태그 기반 마스킹 정책 섹션을 참조하세요.
행 액세스 정책을 할당하려면 행 액세스 정책 이해하기 섹션을 참조하세요.
2단계: Spark용 Snowflake Connector를 사용하여 Spark를 Iceberg 테이블에 연결하기¶
이 단계에서는 Horizon Catalog를 통해 Spark를 Iceberg 테이블에 연결합니다. 이 연결을 통해 테이블에 적용된 데이터 보호 정책과 함께 Spark를 사용하여 테이블을 쿼리할 수 있습니다.
Spark용 Snowflake Connector(Spark Connector)를 사용하여 Spark를 Iceberg 테이블에 연결하려면 먼저 Spark Connector를 다운로드한 후 Spark를 Iceberg 테이블에 연결합니다.
Spark용 Snowflake Connector 다운로드하기¶
Spark용 Snowflake Connector 3.1.6 이상 버전을 다운로드하려면 :doc:`/user-guide/spark-connector-install`의 지침을 따릅니다.
Spark를 Iceberg 테이블에 연결하기¶
이 단계에서는 Horizon Catalog를 통해 Spark를 Iceberg 테이블에 연결합니다. 이 연결에는 Horizon Catalog가 포함된 Spark용 Snowflake Connector를 사용하여 Snowflake 데이터 보호 정책으로 보호되는 테이블을 쿼리하기 위한 구성이 포함됩니다.
참고
외부 OAuth 또는 키 페어 인증을 사용하는 경우 외부 OAuth 또는 키 페어 인증을 사용하여 Spark를 Iceberg 테이블에 연결하기 섹션을 참조하세요.
프로그래밍 방식 액세스 토큰(PAT)을 사용하여 Spark를 Iceberg 테이블에 연결하려면 다음 예제 PySpark 코드를 사용합니다.
from pyspark.sql import SparkSession # Snowflake Horizon Catalog Configuration, change as per your environment CATALOG_URI = "https://<account_identifier>.snowflakecomputing.com/polaris/api/catalog" ROLE = "<role>" HORIZON_SESSION_ROLE = f"session:role:{ROLE}" CATALOG_NAME = "<database_name>" #provide in UPPER CASE SF_URL= "<account_identifier>.snowflakecomputing.com" SF_USER = "<user_name>" #provide in UPPER CASE SF_PASSWORD = "<user_password>" SF_SCHEMA = "<schema_name>" #provide in UPPER CASE SF_WAREHOUSE = "<warehouse_name>" #provide in UPPER CASE # Cloud Service Provider Region Configuration (where the Iceberg data is stored) REGION = "<region_name>" # Paste the External Oauth Access token that you generated in Snowflake here ACCESS_TOKEN = "<your_access_token>" # Paste the PAT you generated in Snowflake here PAT_TOKEN = "<your_PAT_token>" # Iceberg Version ICEBERG_VERSION = "1.9.1" #Snowflake Connector for Spark DRIVER_VERSION = "3.24.0" # (or above) SNOWFLAKE_CONNECTOR_VERSION = "3.1.6" try: spark.stop() except: pass spark = ( SparkSession.builder .master("local[*]") .config("spark.ui.port", "0") .config("spark.driver.bindAddress", "127.0.0.1") .config("spark.driver.host", "127.0.0.1") .config("spark.driver.port", "0") .config("spark.blockManager.port", "0") # JAR Dependencies for Iceberg, Azure and Snowflake Connector for Spark .config( "spark.jars.packages", f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{ICEBERG_VERSION}," f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}," # for Azure storage, use the below package and comment above azure bundle # f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}" # for Snowflake Connector for Spark f"net.snowflake:snowflake-jdbc:{DRIVER_VERSION}," f"net.snowflake:spark-snowflake_2.12:{SNOWFLAKE_CONNECTOR_VERSION}" ) # Iceberg SQL Extensions .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.defaultCatalog", "horizoncatalog") .config("spark.sql.catalog.horizoncatalog", "org.apache.spark.sql.snowflake.catalog.SnowflakeFallbackCatalog") #Horizon REST Catalog Configuration .config(f"spark.sql.catalog.horizoncatalog.catalog-impl", "org.apache.iceberg.spark.SparkCatalog") .config(f"spark.sql.catalog.horizoncatalog.type", "rest") .config(f"spark.sql.catalog.horizoncatalog.uri", CATALOG_URI) .config(f"spark.sql.catalog.horizoncatalog.warehouse", CATALOG_NAME) .config(f"spark.sql.catalog.horizoncatalog.scope", HORIZON_SESSION_ROLE) .config(f"spark.sql.catalog.horizoncatalog.client.region", REGION) .config(f"spark.sql.catalog.horizoncatalog.credential", PAT_TOKEN) # for External Oauth use below and comment above configuration .token #.config(f"spark.sql.catalog.horizoncatalog.token", ACCESS_TOKEN) .config("spark.sql.catalog.horizoncatalog.io-impl","org.apache.iceberg.aws.s3.S3FileIO") # Enforcing policies using Snowflake Connector for Spark .config("spark.snowflake.sfURL", SF_URL) .config("spark.snowflake.sfUser", SF_USER) .config("spark.snowflake.sfPassword", SF_PASSWORD) # for External Oauth uncomment below and comment above configurations for user and password #.config("spark.snowflake.sfAuthenticator","oauth") #.config("spark.snowflake.sfToken",ACCESS_TOKEN) .config("spark.snowflake.sfDatabase", CATALOG_NAME) .config("spark.snowflake.sfSchema",SF_SCHEMA) # Optional .config("spark.snowflake.sfRole",ROLE) .config("spark.snowflake.sfWarehouse",SF_WAREHOUSE) # Required for vended credentials .config(f"spark.sql.catalog.horizoncatalog.header.X-Iceberg-Access-Delegation", "vended-credentials") .config("spark.sql.iceberg.vectorization.enabled", "false") .getOrCreate() ) spark.sparkContext.setLogLevel("ERROR")
여기서
<account_identifier>`는 쿼리하려는 Iceberg 테이블이 포함된 Snowflake 계정의 Snowflake 계정 식별자입니다. 이 식별자를 찾으려면 :doc:/user-guide/admin-account-identifier` 섹션을 참조하세요.:code:`<your_access_token>`은 획득한 액세스 토큰입니다. 액세스 토큰을 얻으려면 :ref:`인증을 위한 액세스 토큰 얻기<label-tables_iceberg_query_using_external_query_engine_snowflake_horizon_generate_access_token>`를 참조하세요.
참고
또는 외부 OAuth의 경우 액세스 토큰을 지정하는 대신 자동 토큰 새로 고침을 사용하여 엔진에 대한 연결을 구성할 수 있습니다.
:code:`<database_name>`은 쿼리하려는 Snowflake 관리 Iceberg 테이블이 포함된 Snowflake 계정의 데이터베이스 이름입니다.
참고
Spark의 다음 속성에는 Snowflake 웨어하우스 이름이 아닌 Snowflake 데이터베이스 이름이 필요합니다.
.warehouse.sfDatabase
:code:`<role>`은 쿼리하려는 Iceberg 테이블에 대한 액세스 권한으로 구성된 Snowflake의 역할입니다. 예를 들어 DATA_ENGINEER 과 같습니다.
:code:`<user_name>`은 Snowflake에서 테이블에 액세스하는 데 사용되는 사용자 이름입니다.
:code:`<user_password>`는 테이블에 액세스하는 사용자의 비밀번호입니다.
참고
이 비밀번호는 해당하는 경우 인증을 위해 얻은 프로그래밍 방식 액세스 토큰(PAT)일 수 있습니다.
:code:`<schema_name>`은 테이블이 저장되는 Snowflake의 스키마입니다. 이는 선택 사항입니다.
:code:`<warehouse_name>`은 정책 평가에 사용할 Snowflake 웨어하우스(컴퓨팅 인스턴스) 이름입니다.
중요
기본적으로 코드 예제는 Amazon S3에 저장된 Apache Iceberg™ 테이블에 대해 설정됩니다. Iceberg 테이블이 Azure Storage(ADLS)에 저장되는 경우 다음 단계를 수행합니다.
다음 줄을 주석으로 처리합니다.
f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}"다음 줄의 주석 처리를 제거합니다.
# f"org.apache.iceberg:iceberg-azure-bundle:{ICEBERG_VERSION}"
외부 OAuth 또는 키 페어 인증을 사용하여 Spark를 Iceberg 테이블에 연결하기¶
이전 코드 예제에서는 프로그래밍 방식 액세스 토큰(PAT)을 사용하여 연결하기 위한 구성을 보여줍니다.
외부 OAuth 또는 키 페어 인증을 사용하여 Spark를 Iceberg 테이블에 연결하려면 다음 단계에 따라 이전 코드 예제를 변경합니다.
:code:`<your_access_token>`의 경우 외부 OAuth 또는 키 페어 인증에 대한 액세스 토큰을 지정합니다.
액세스 토큰을 얻으려면 3단계: 인증을 위한 액세스 토큰 얻기 섹션을 참조하세요.
다음 줄을 주석으로 처리합니다.
.config(f"spark.sql.catalog.{CATALOG_NAME}.credential", PAT_TOKEN)다음 줄의 주석 처리를 제거합니다.
#.config(f"spark.sql.catalog.{CATALOG_NAME}.token", ACCESS_TOKEN)
3단계: Spark를 사용하여 Iceberg 테이블 쿼리하기¶
Spark를 사용하여 Snowflake 데이터 보호 정책으로 보호되는 Iceberg 테이블을 읽습니다. Spark는 Snowflake 정책으로 보호되는 테이블의 쿼리를 Snowflake를 통해 자동으로 라우팅하여 일관된 적용을 보장할 수 있습니다.
테이블 쿼리하기¶
spark.sql("SHOW NAMESPACES").show(truncate=False)
spark.sql("USE horizoncatalog.<schema_name>")
spark.sql("SHOW TABLES").show(truncate=False)
spark.sql("Select * from <your_table_name_in_snowflake>").show(truncate=False)
정책 평가를 위한 쿼리 모니터링하기¶
정책 평가를 위해 Spark에서 Snowflake로 라우팅되는 쿼리에 대한 Snowflake의 쿼리 활동을 모니터링하려면 Snowflake 계정에서 쿼리 활동을 모니터링하면 됩니다.
Snowflake에서 쿼리 기록을 모니터링하려면 :doc:`/user-guide/ui-snowsight-activity`의 지침을 따릅니다.
데이터 보호 정책 구성 시 고려 사항¶
데이터 보호 정책을 구성할 때 다음 사항을 고려합니다.
Spark를 사용하여 쿼리하는 Iceberg 테이블에 데이터 보호 정책 적용은 테이블에 다음 데이터 보호 정책이 설정된 경우에만 지원됩니다.
마스킹 정책
태그 기반 마스킹 정책
행 액세스 정책
다른 모든 정책으로 보호되는 테이블에 대한 쿼리는 오류를 발생시킵니다.