PythonでSnowflake統合を管理する

Pythonを使用して、Snowflakeでさまざまなタイプの統合を管理できます。

前提条件

このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。

たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。

カタログ統合の管理

アカウントで Apache Iceberg™ テーブルのカタログ統合を管理できます。カタログ統合とは、 SnowflakeをIcebergカタログとして使用しない場合、または Snowflake Open Catalog と統合したい場合に、シナリオ用のIcebergテーブルのメタデータをどのように整理するかについての情報を格納する、名前付きのアカウントレベルのSnowflakeオブジェクトです。詳細については、 Apache Iceberg™ テーブルカタログ統合 セクションをご参照ください。

注釈

ALTER CATALOG INTEGRATION は現在サポートされていません。

Snowflake Python APIs は、2つのタイプに分かれたカタログ統合を表しています。

  • CatalogIntegration: カタログ統合の名前、テーブル形式、カタログ設定などのプロパティを公開します。

  • CatalogIntegrationResource: 対応する CatalogIntegration オブジェクトを取得し、データベースを削除するために使用できるメソッドを公開します。

カタログ統合の作成

カタログ統合を作成するには、まず CatalogIntegration オブジェクトを作成して、 API Root オブジェクトから CatalogIntegrationCollection オブジェクトを作成します。 CatalogIntegrationCollection.create を使用して、新しいカタログ統合をSnowflakeに追加します。

アカウントで、次のタイプの外部 Iceberg カタログのカタログ統合を作成できます。

AWS Glue

以下の例のコードは、指定されたプロパティを持つ AWS Glue を使用する Iceberg テーブルの my_catalog_integration という名前のカタログ統合を表す CatalogIntegration オブジェクトを作成します。

from snowflake.core.catalog_integration import CatalogIntegration, Glue

root.catalog_integrations.create(CatalogIntegration(
    name="my_catalog_integration",
    catalog = Glue(
        catalog_namespace="abcd-ns",
        glue_aws_role_arn="arn:aws:iam::123456789012:role/sqsAccess",
        glue_catalog_id="1234567",
    ),
    table_format="ICEBERG",
    enabled=True,
))
Copy

オブジェクトストア

次の例のコードは、オブジェクト・ストアを使用するIcebergテーブルの my_catalog_integration というカタログ統合を表す CatalogIntegration オブジェクトを作成します。

from snowflake.core.catalog_integration import CatalogIntegration, ObjectStore

root.catalog_integrations.create(CatalogIntegration(
    name="my_catalog_integration",
    catalog = ObjectStore(),
    table_format="ICEBERG",
    enabled=True,
))
Copy

Snowflake Open Catalog

以下の例のコードは、指定されたプロパティを持つ Open Catalog を使用する Iceberg テーブルの my_catalog_integration という名前のカタログ統合を表す CatalogIntegration オブジェクトを作成します。

from snowflake.core.catalog_integration import CatalogIntegration, OAuth, Polaris, RestConfig

root.catalog_integrations.create(CatalogIntegration(
    name="my_catalog_integration",
    catalog = Polaris(
        catalog_namespace="abcd-ns",
        rest_config=RestConfig(
            catalog_uri="https://my_account.snowflakecomputing.com/polaris/api/catalog",
            warehouse="my-warehouse",
        ),
        rest_authentication=OAuth(
            type="OAUTH",
            oauth_client_id="my_client_id",
            oauth_client_secret="my_client_secret",
            oauth_allowed_scopes=["PRINCIPAL_ROLE:ALL"],
        ),
    ),
    table_format="ICEBERG",
    enabled=True,
))
Copy

カタログ統合の詳細の取得

CatalogIntegration オブジェクトを返す CatalogIntegrationResource.fetch メソッドを呼び出すことで、カタログ統合に関する情報を取得できます。

次の例のコードは、 my_catalog_integration という名前のカタログ統合の情報を取得します。

my_catalog_integration = root.catalog_integrations["my_catalog_integration"].fetch()
print(my_catalog_integration.to_dict())
Copy

カタログ統合のリスト

CatalogIntegration オブジェクトの PagedIter 反復子を返す CatalogIntegrationCollection.iter メソッドを使用して、カタログ統合を一覧表示することができます。

次の例のコードは、名前が my で始まるカタログ統合を一覧表示し、それぞれの名前を表示します。

catalog_integration_iter = root.catalog_integrations.iter(like="my%")
for catalog_integration_obj in catalog_integration_iter:
  print(catalog_integration_obj.name)
Copy

カタログ統合のドロップ

カタログ統合を CatalogIntegrationResource オブジェクトにドロップできます。

次の例のコードは、 my_catalog_integration カタログ統合リソース・オブジェクトを取得し、カタログ統合をドロップします。

my_catalog_integration_res = root.catalog_integrations["my_catalog_integration"]
my_catalog_integration_res.drop()
Copy

通知統合の管理

通知統合を管理することができます。これは、サードパーティのクラウドメッセージキューイングサービス、メールサービス、WebhookなどのサードパーティのメッセージングサービスとSnowflakeの間のインタフェースを提供するSnowflakeオブジェクトです。詳細については、 Snowflakeでの通知 をご参照ください。

注釈

ALTER NOTIFICATION INTEGRATION は現在サポートされていません。

Snowflake Python APIs は、2つのタイプに分かれた通知統合を表しています。

  • NotificationIntegration: 通知統合の名前や通知フック設定などのプロパティを公開します。

  • NotificationIntegrationResource: 対応する NotificationIntegration オブジェクトを取得し、通知統合をドロップするために使用できるメソッドを公開します。

通知統合の作成

通知統合を作成するには、まず NotificationIntegration オブジェクトを作成して、 API Root オブジェクトから NotificationIntegrationCollection オブジェクトを作成します。 NotificationIntegrationCollection.create を使用して、新しい通知統合をSnowflakeに追加します。

以下のタイプのメッセージングサービスの通知統合を作成できます。

メール

次の例のコードは、指定された NotificationEmail プロパティを持つ my_email_notification_integration という名前の通知統合を表す NotificationIntegration オブジェクトを作成します。

from snowflake.core.notification_integration import NotificationEmail, NotificationIntegration

my_notification_integration = NotificationIntegration(
  name="my_email_notification_integration",
  notification_hook=NotificationEmail(
      allowed_recipients=["test1@snowflake.com", "test2@snowflake.com"],
      default_recipients=["test1@snowflake.com"],
      default_subject="test default subject",
  ),
  enabled=True,
)

root.notification_integrations.create(my_notification_integration)
Copy

ウェブフック

次の例のコードは、指定された NotificationWebhook プロパティを持つ my_webhook_notification_integration という名前の通知統合を表す NotificationIntegration オブジェクトを作成します。

from snowflake.core.notification_integration import NotificationIntegration, NotificationWebhook

my_notification_integration = NotificationIntegration(
  name="my_webhook_notification_integration",
  enabled=False,
  notification_hook=NotificationWebhook(
      webhook_url=webhook_url,
      webhook_secret=WebhookSecret(
          # This example assumes that this secret already exists
          name="mySecret".upper(), database_name=database, schema_name=schema
      ),
      webhook_body_template=webhook_template,
      webhook_headers=webhook_headers,
  ),
)

root.notification_integrations.create(my_notification_integration)
Copy

Amazon SNS トピック(アウトバウンド)

次の例のコードは、指定された NotificationQueueAwsSnsOutbound プロパティを持つ my_aws_sns_outbound_notification_integration という名前の通知統合を表す NotificationIntegration オブジェクトを作成します。

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAwsSnsOutbound

my_notification_integration = NotificationIntegration(
  name="my_aws_sns_outbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueAwsSnsOutbound(
      aws_sns_topic_arn="arn:aws:sns:us-west-1:123456789012:sns-test-topic",
      aws_sns_role_arn="arn:aws:iam::123456789012:role/sns-test-topic",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

Microsoft Azure Event Gridトピック(出力)

次の例のコードは、指定された NotificationQueueAzureEventGridOutbound プロパティを持つ my_azure_outbound_notification_integration という名前の通知統合を表す NotificationIntegration オブジェクトを作成します。

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAzureEventGridOutbound

my_notification_integration = NotificationIntegration(
  name="my_azure_outbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueAzureEventGridOutbound(
      azure_event_grid_topic_endpoint="https://fake.queue.core.windows.net/api/events",
      azure_tenant_id="fake.onmicrosoft.com",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

Microsoft Azure Event Gridトピック(入力)

次の例のコードは、指定された NotificationQueueAzureEventGridInbound プロパティを持つ my_azure_inbound_notification_integration という名前の通知統合を表す NotificationIntegration オブジェクトを作成します。

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAzureEventGridInbound

my_notification_integration = NotificationIntegration(
  name="my_azure_inbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueAzureEventGridInbound(
      azure_storage_queue_primary_uri="https://fake.queue.core.windows.net/snowapi_queue",
      azure_tenant_id="fake.onmicrosoft.com",
  ),
)

root.notification_integrations.create(my_notification_integration)
Copy

Google Pub/Subトピック(出力)

次の例のコードは、指定された NotificationQueueGcpPubsubOutbound プロパティを持つ my_gcp_outbound_notification_integration という名前の通知統合を表す NotificationIntegration オブジェクトを作成します。

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueGcpPubsubOutbound

my_notification_integration = NotificationIntegration(
  name="my_gcp_outbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueGcpPubsubOutbound(
      gcp_pubsub_topic_name="projects/fake-project-name/topics/pythonapi-test",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

Google Pub/Subトピック(入力)

次の例のコードは、指定された NotificationQueueGcpPubsubInbound プロパティを持つ my_gcp_inbound_notification_integration という名前の通知統合を表す NotificationIntegration オブジェクトを作成します。

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueGcpPubsubInbound

my_notification_integration = NotificationIntegration(
  name="my_gcp_inbound_notification_integration",
  enabled=True,
  notification_hook=NotificationQueueGcpPubsubInbound(
      gcp_pubsub_subscription_name="projects/fake-project-name/subscriptions/sub-test",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

通知統合の詳細の取得

NotificationIntegration オブジェクトを返す NotificationIntegrationResource.fetch メソッドを呼び出すことで、通知統合に関する情報を取得できます。

次の例のコードは、 my_notification_integration という名前の通知統合の情報を取得します。

my_notification_integration = root.notification_integrations["my_notification_integration"].fetch()
print(my_notification_integration.to_dict())
Copy

通知統合のリスト

NotificationIntegration オブジェクトの PagedIter 反復子を返す NotificationIntegrationCollection.iter メソッドを使用して、カタログ統合を一覧表示することができます。

次の例のコードは、名前が my で始まる通知統合をリストし、それぞれの名前を表示します。

notification_integration_iter = root.notification_integrations.iter(like="my%")
for notification_integration_obj in notification_integration_iter:
  print(notification_integration_obj.name)
Copy

通知統合のドロップ

NotificationIntegrationResource オブジェクトで通知統合をドロップできます。

次の例のコードは、 my_notification_integration 通知統合リソースオブジェクトを取得し、通知統合をドロップします。

my_notification_integration_res = root.notification_integrations["my_notification_integration"]
my_notification_integration_res.drop()
Copy