外部 OAuth でSnowflake Open Catalogに接続する

このトピックでは、クライアントアプリケーションを使用した外部 OAuth でSnowflake Open Catalogに接続する方法について説明します。

このトピックのコード例は、Apache Spark™ を使用して接続する方法を示しています。コード例は PySpark にあります。

注釈

Snowflakeを使用してOpen Catalogの 管理テーブルをクエリする場合は、外部 OAuth を使用するSnowflakeのカタログ統合を作成できます。詳細については、Snowflakeドキュメントの[CREATE CATALOG INTEGRATION(Snowflake Open Catalog)](https://docs.snowflake.com/en/sql-reference/sql/create-catalog-integration-open-catalog)をご参照ください。

前提条件

外部 OAuth でOpen Catalogに接続する前に、Open Catalogに外部 OAuth を構成する必要があります。手順については、[Snowflake Open Catalogで外部 OAuth を構成する](external-oauth-configure.md)をご参照ください。

自動更新トークンを使用してOpen Catalogに接続する(推奨メソッド)

このメソッドを使用すると、自動更新トークンを使用して接続するため、手動でトークンを更新する必要がありません。

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,<maven_coordinate>') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://<open_catalog_account_identifier>.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.warehouse','<catalog_name>') \
    .config('spark.sql.catalog.opencatalog.rest.auth.type','oauth2') \
    .config('spark.sql.catalog.opencatalog.oauth2-server-uri','<oauth2_server_uri>') \
    .config('spark.sql.catalog.opencatalog.credential','<oauth_client_id>:<oauth_client_secret>') \
    .config('spark.sql.catalog.opencatalog.scope','SESSION:ROLE:<custom_role>') \
    .config('spark.sql.catalog.opencatalog.audience','https://<open_catalog_account_identifier>.snowflakecomputing.com') \
    .getOrCreate()
Copy

パラメーター

パラメーター

説明

<catalog_name>

接続先のカタログ名を指定します。

重要:
<catalog_name> は大文字と小文字を区別します。

<maven_coordinate>

外部クラウドストレージプロバイダーのMaven座標を指定します。

  • S3: software.amazon.awssdk:bundle:2.20.160
  • クラウドストレージ(Google): org.apache.iceberg:iceberg-gcp-bundle:1.5.2
  • Azure: org.apache.iceberg:iceberg-azure-bundle:1.5.2
このパラメーターが表示されない場合は、コードサンプルに正しい値がすでに指定されています。

<open_catalog_account_identifier>

オープンカタログアカウントのアカウント識別子を指定します。

アカウントのリージョンやクラウドプラットフォームによっては、この識別子はそれ自体がアカウントのロケータになることもあれば(例えば、 xy12345)、追加のセグメントを含むこともあります。詳細については、識別子としてのアカウントロケーターの使用をご参照ください。

<oauth2_server_uri>

あなたの OAuth2 サーバー URI。

<oauth_client_id>

あなたの OAuth2 クライアント ID。

<oauth_client_secret>

あなたの OAuth2 クライアントシークレット。

<custom_role>

サービスプリンシパルに権限を付与するOpen Catalogのカスタムロールの名前。

アクセストークンを使用してOpen Catalogに接続する

必要に応じて、アクセストークンを使用してOpen Catalogに接続することができます。ただし、アクセストークンは期限切れになるため、手動で更新する必要があります。あるいは、[自動リフレッシュトークンを使用して接続する](#connect-with-open-catalog-by-using-autotic-refresh-token-prefered-method)こともできます。

以下のコード例は、Sparkを使用してOpen Catalogに接続するためのものです。

パラメーター

パラメーター

説明

<catalog_name>

接続先のカタログ名を指定します。

重要:
<catalog_name> は大文字と小文字を区別します。

<maven_coordinate>

外部クラウドストレージプロバイダーのMaven座標を指定します。

  • S3: software.amazon.awssdk:bundle:2.20.160
  • クラウドストレージ(Google): org.apache.iceberg:iceberg-gcp-bundle:1.5.2
  • Azure: org.apache.iceberg:iceberg-azure-bundle:1.5.2
このパラメーターが表示されない場合は、コードサンプルに正しい値がすでに指定されています。

<access_token>

使用するクライアントアプリケーションのアクセストークンを指定します。

[Open Catalogの外部 OAuth 構成時に生成したアクセストークン](external-oauth-configure.md#optional-step-9-generate-the-access-token)を入力します。

<open_catalog_account_identifier>

オープンカタログアカウントのアカウント識別子を指定します。

アカウントのリージョンやクラウドプラットフォームによっては、この識別子はそれ自体がアカウントのロケータになることもあれば(例えば、 xy12345)、追加のセグメントを含むこともあります。詳細については、識別子としてのアカウントロケーターの使用をご参照ください。

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,<maven_coordinate>') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://<open_catalog_account_identifier>.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.token','<access_token>') \
    .config('spark.sql.catalog.opencatalog.warehouse','<catalog_name>') \
    .getOrCreate()
Copy

クロスリージョン接続で接続する(Amazon S3のみ)

以下のコード例は、次のような場合にOpen Catalogに接続するためのものです。

  • オープンカタログアカウントがAmazon S3でホストされている。

  • 外部ストレージのプロバイダーがAmazon S3である。

  • オープンカタログアカウントがApache Iceberg™テーブルを含むストレージバケットがあるS3リージョンとは異なるS3リージョンでホストされている。

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,software.amazon.awssdk:bundle:2.20.160') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://<open_catalog_account_identifier>.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.token','<access_token>') \
    .config('spark.sql.catalog.opencatalog.warehouse','<catalog_name>') \
    .config('spark.sql.catalog.opencatalog.client.region','<target_s3_region>') \
    .getOrCreate()
Copy

パラメーター

パラメーター

説明

<catalog_name>

接続先のカタログ名を指定します。

重要:
<catalog_name> は大文字と小文字を区別します。

<access_token>

使用するクライアントアプリケーションのアクセストークンを指定します。

[Open Catalogの外部 OAuth 構成時に生成したアクセストークン](external-oauth-configure.md#optional-step-9-generate-the-access-token)を入力します。

<open_catalog_account_identifier>

オープンカタログアカウントのアカウント識別子を指定します。アカウントのリージョンやクラウドプラットフォームによっては、この識別子はそれ自体がアカウントのロケータになることもあれば(例えば、 xy12345)、追加のセグメントを含むこともあります。詳細については、識別子としてのアカウントロケーターの使用をご参照ください。

<target_s3_region>

Apache Icebergテーブルを含むS3バケットがあるリージョンコードを指定します。リージョンコードについては、AWS サービスエンドポイント をご覧になり、表のリージョン列をご参照ください。

このセクションには、Sparkを使用してOpen Catalogに接続する例が含まれています。

例1:接続(S3)

以下をご参照ください。

  • [自動更新を使用した接続(S3)](#connect-by-using-autotical-refresh-s3)

  • [アクセストークンを使用した接続(S3)](#connect-by-using-access-token-s3)

自動更新を使用して接続する(S3)

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,software.amazon.awssdk:bundle:2.20.160') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
    .config('spark.sql.catalog.opencatalog.rest.auth.type','oauth2') \
    .config('spark.sql.catalog.opencatalog.oauth2-server-uri','your-tenant.region.auth0.com') \
    .config('spark.sql.catalog.opencatalog.credential','11111111111111111111111111111111:222222222222222222222222222222222222222222222222222222222222222222') \
    .config('spark.sql.catalog.opencatalog.scope','SESSION:ROLE:DATA_ENG') \
    .config('spark.sql.catalog.opencatalog.audience','https://ab12345.snowflakecomputing.com') \
    .getOrCreate()
Copy

アクセストークンを使用して接続する(S3)

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,software.amazon.awssdk:bundle:2.20.160') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.token','0000000000000000000000000001111111111111111111111111111111111111111111') \
    .config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
    .getOrCreate()
Copy

例2:接続(Google Cloud Storage)

以下をご参照ください。

  • [自動更新を使用した接続(Google Cloud Storage)](#connect-by-using-automac-refresh-cloud-storage-from-google)

  • [アクセストークンを使用した接続(Google Cloud Storage)](#connect-by-using-access-token-cloud-storage-from-google)

自動更新を使用して接続する(Google Cloud Storage)

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-gcp-bundle:1.5.2') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
    .config('spark.sql.catalog.opencatalog.rest.auth.type','oauth2') \
    .config('spark.sql.catalog.opencatalog.oauth2-server-uri','your-tenant.region.auth0.com') \
    .config('spark.sql.catalog.opencatalog.credential','11111111111111111111111111111111:222222222222222222222222222222222222222222222222222222222222222222') \
    .config('spark.sql.catalog.opencatalog.scope','SESSION:ROLE:DATA_ENG') \
    .config('spark.sql.catalog.opencatalog.audience','https://ab12345.snowflakecomputing.com') \
    .getOrCreate()
Copy

アクセストークンを使用して接続する(Google Cloud Storage)

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-gcp-bundle:1.5.2') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.token','0000000000000000000000000001111111111111111111111111111111111111111111') \
    .config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
    .getOrCreate()
Copy

例3:接続(Azure)

以下をご参照ください。

  • [自動更新を使用した接続(Azure)](#connect-by-using-autotical-refresh-azure)

  • [アクセストークンを使用した接続(Azure)](#connect-by-using-access-token-azure)

自動更新を使用して接続する(Azure)

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-azure-bundle:1.5.2') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
    .config('spark.sql.catalog.opencatalog.rest.auth.type','oauth2') \
    .config('spark.sql.catalog.opencatalog.oauth2-server-uri','your-tenant.region.auth0.com') \
    .config('spark.sql.catalog.opencatalog.credential','11111111111111111111111111111111:222222222222222222222222222222222222222222222222222222222222222222') \
    .config('spark.sql.catalog.opencatalog.scope','SESSION:ROLE:DATA_ENG') \
    .config('spark.sql.catalog.opencatalog.audience','https://ab12345.snowflakecomputing.com') \
    .getOrCreate()
Copy

アクセストークンを使用して接続する(Azure)

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-azure-bundle:1.5.2') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.token','0000000000000000000000000001111111111111111111111111111111111111111111') \
    .config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
    .getOrCreate()
Copy

オープンカタログへの接続の確認

Sparkがオープンカタログに接続されていることを確認するには、カタログの名前空間をリスト表示します。詳細については、名前空間のリスト表示をご参照ください。