PythonでSnowflake統合を管理する¶
Pythonを使用して、Snowflakeでさまざまなタイプの統合を管理できます。
前提条件¶
このトピックの例では、Snowflakeと接続するコードを追加して Root
オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。
たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
出来上がった Session
オブジェクトを使って、コードは API のタイプとメソッドを使う Root
オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。
カタログ統合の管理¶
アカウントで Apache Iceberg™ テーブルのカタログ統合を管理できます。カタログ統合とは、 SnowflakeをIcebergカタログとして使用しない場合、または Snowflake Open Catalog と統合したい場合に、シナリオ用のIcebergテーブルのメタデータをどのように整理するかについての情報を格納する、名前付きのアカウントレベルのSnowflakeオブジェクトです。詳細については、 Apache Iceberg™ テーブル の カタログ統合 セクションをご参照ください。
注釈
ALTER CATALOG INTEGRATION は現在サポートされていません。
Snowflake Python APIs は、2つのタイプに分かれたカタログ統合を表しています。
CatalogIntegration
: カタログ統合の名前、テーブル形式、カタログ設定などのプロパティを公開します。CatalogIntegrationResource
: 対応するCatalogIntegration
オブジェクトを取得し、データベースを削除するために使用できるメソッドを公開します。
カタログ統合の作成¶
カタログ統合を作成するには、まず CatalogIntegration
オブジェクトを作成して、 API Root
オブジェクトから CatalogIntegrationCollection
オブジェクトを作成します。 CatalogIntegrationCollection.create
を使用して、新しいカタログ統合をSnowflakeに追加します。
アカウントで、次のタイプの外部 Iceberg カタログのカタログ統合を作成できます。
AWS Glue¶
以下の例のコードは、指定されたプロパティを持つ AWS Glue を使用する Iceberg テーブルの my_catalog_integration
という名前のカタログ統合を表す CatalogIntegration
オブジェクトを作成します。
from snowflake.core.catalog_integration import CatalogIntegration, Glue
root.catalog_integrations.create(CatalogIntegration(
name="my_catalog_integration",
catalog = Glue(
catalog_namespace="abcd-ns",
glue_aws_role_arn="arn:aws:iam::123456789012:role/sqsAccess",
glue_catalog_id="1234567",
),
table_format="ICEBERG",
enabled=True,
))
オブジェクトストア¶
次の例のコードは、オブジェクト・ストアを使用するIcebergテーブルの my_catalog_integration
というカタログ統合を表す CatalogIntegration
オブジェクトを作成します。
from snowflake.core.catalog_integration import CatalogIntegration, ObjectStore
root.catalog_integrations.create(CatalogIntegration(
name="my_catalog_integration",
catalog = ObjectStore(),
table_format="ICEBERG",
enabled=True,
))
Snowflake Open Catalog¶
以下の例のコードは、指定されたプロパティを持つ Open Catalog を使用する Iceberg テーブルの my_catalog_integration
という名前のカタログ統合を表す CatalogIntegration
オブジェクトを作成します。
from snowflake.core.catalog_integration import CatalogIntegration, OAuth, Polaris, RestConfig
root.catalog_integrations.create(CatalogIntegration(
name="my_catalog_integration",
catalog = Polaris(
catalog_namespace="abcd-ns",
rest_config=RestConfig(
catalog_uri="https://my_account.snowflakecomputing.com/polaris/api/catalog",
warehouse="my-warehouse",
),
rest_authentication=OAuth(
type="OAUTH",
oauth_client_id="my_client_id",
oauth_client_secret="my_client_secret",
oauth_allowed_scopes=["PRINCIPAL_ROLE:ALL"],
),
),
table_format="ICEBERG",
enabled=True,
))
カタログ統合の詳細の取得¶
CatalogIntegration
オブジェクトを返す CatalogIntegrationResource.fetch
メソッドを呼び出すことで、カタログ統合に関する情報を取得できます。
次の例のコードは、 my_catalog_integration
という名前のカタログ統合の情報を取得します。
my_catalog_integration = root.catalog_integrations["my_catalog_integration"].fetch()
print(my_catalog_integration.to_dict())
カタログ統合のリスト¶
CatalogIntegration
オブジェクトの PagedIter
反復子を返す CatalogIntegrationCollection.iter
メソッドを使用して、カタログ統合を一覧表示することができます。
次の例のコードは、名前が my
で始まるカタログ統合を一覧表示し、それぞれの名前を表示します。
catalog_integration_iter = root.catalog_integrations.iter(like="my%")
for catalog_integration_obj in catalog_integration_iter:
print(catalog_integration_obj.name)
カタログ統合のドロップ¶
カタログ統合を CatalogIntegrationResource
オブジェクトにドロップできます。
次の例のコードは、 my_catalog_integration
カタログ統合リソース・オブジェクトを取得し、カタログ統合をドロップします。
my_catalog_integration_res = root.catalog_integrations["my_catalog_integration"]
my_catalog_integration_res.drop()
通知統合の管理¶
通知統合を管理することができます。これは、サードパーティのクラウドメッセージキューイングサービス、メールサービス、WebhookなどのサードパーティのメッセージングサービスとSnowflakeの間のインタフェースを提供するSnowflakeオブジェクトです。詳細については、 Snowflakeでの通知 をご参照ください。
注釈
ALTER NOTIFICATION INTEGRATION は現在サポートされていません。
Snowflake Python APIs は、2つのタイプに分かれた通知統合を表しています。
NotificationIntegration
: 通知統合の名前や通知フック設定などのプロパティを公開します。NotificationIntegrationResource
: 対応するNotificationIntegration
オブジェクトを取得し、通知統合をドロップするために使用できるメソッドを公開します。
通知統合の作成¶
通知統合を作成するには、まず NotificationIntegration
オブジェクトを作成して、 API Root
オブジェクトから NotificationIntegrationCollection
オブジェクトを作成します。 NotificationIntegrationCollection.create
を使用して、新しい通知統合をSnowflakeに追加します。
以下のタイプのメッセージングサービスの通知統合を作成できます。
メール¶
次の例のコードは、指定された NotificationEmail
プロパティを持つ my_email_notification_integration
という名前の通知統合を表す NotificationIntegration
オブジェクトを作成します。
from snowflake.core.notification_integration import NotificationEmail, NotificationIntegration
my_notification_integration = NotificationIntegration(
name="my_email_notification_integration",
notification_hook=NotificationEmail(
allowed_recipients=["test1@snowflake.com", "test2@snowflake.com"],
default_recipients=["test1@snowflake.com"],
default_subject="test default subject",
),
enabled=True,
)
root.notification_integrations.create(my_notification_integration)
ウェブフック¶
次の例のコードは、指定された NotificationWebhook
プロパティを持つ my_webhook_notification_integration
という名前の通知統合を表す NotificationIntegration
オブジェクトを作成します。
from snowflake.core.notification_integration import NotificationIntegration, NotificationWebhook
my_notification_integration = NotificationIntegration(
name="my_webhook_notification_integration",
enabled=False,
notification_hook=NotificationWebhook(
webhook_url=webhook_url,
webhook_secret=WebhookSecret(
# This example assumes that this secret already exists
name="mySecret".upper(), database_name=database, schema_name=schema
),
webhook_body_template=webhook_template,
webhook_headers=webhook_headers,
),
)
root.notification_integrations.create(my_notification_integration)
Amazon SNS トピック(アウトバウンド)¶
次の例のコードは、指定された NotificationQueueAwsSnsOutbound
プロパティを持つ my_aws_sns_outbound_notification_integration
という名前の通知統合を表す NotificationIntegration
オブジェクトを作成します。
from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAwsSnsOutbound
my_notification_integration = NotificationIntegration(
name="my_aws_sns_outbound_notification_integration",
enabled=False,
notification_hook=NotificationQueueAwsSnsOutbound(
aws_sns_topic_arn="arn:aws:sns:us-west-1:123456789012:sns-test-topic",
aws_sns_role_arn="arn:aws:iam::123456789012:role/sns-test-topic",
)
)
root.notification_integrations.create(my_notification_integration)
Microsoft Azure Event Gridトピック(出力)¶
次の例のコードは、指定された NotificationQueueAzureEventGridOutbound
プロパティを持つ my_azure_outbound_notification_integration
という名前の通知統合を表す NotificationIntegration
オブジェクトを作成します。
from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAzureEventGridOutbound
my_notification_integration = NotificationIntegration(
name="my_azure_outbound_notification_integration",
enabled=False,
notification_hook=NotificationQueueAzureEventGridOutbound(
azure_event_grid_topic_endpoint="https://fake.queue.core.windows.net/api/events",
azure_tenant_id="fake.onmicrosoft.com",
)
)
root.notification_integrations.create(my_notification_integration)
Microsoft Azure Event Gridトピック(入力)¶
次の例のコードは、指定された NotificationQueueAzureEventGridInbound
プロパティを持つ my_azure_inbound_notification_integration
という名前の通知統合を表す NotificationIntegration
オブジェクトを作成します。
from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAzureEventGridInbound
my_notification_integration = NotificationIntegration(
name="my_azure_inbound_notification_integration",
enabled=False,
notification_hook=NotificationQueueAzureEventGridInbound(
azure_storage_queue_primary_uri="https://fake.queue.core.windows.net/snowapi_queue",
azure_tenant_id="fake.onmicrosoft.com",
),
)
root.notification_integrations.create(my_notification_integration)
Google Pub/Subトピック(出力)¶
次の例のコードは、指定された NotificationQueueGcpPubsubOutbound
プロパティを持つ my_gcp_outbound_notification_integration
という名前の通知統合を表す NotificationIntegration
オブジェクトを作成します。
from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueGcpPubsubOutbound
my_notification_integration = NotificationIntegration(
name="my_gcp_outbound_notification_integration",
enabled=False,
notification_hook=NotificationQueueGcpPubsubOutbound(
gcp_pubsub_topic_name="projects/fake-project-name/topics/pythonapi-test",
)
)
root.notification_integrations.create(my_notification_integration)
Google Pub/Subトピック(入力)¶
次の例のコードは、指定された NotificationQueueGcpPubsubInbound
プロパティを持つ my_gcp_inbound_notification_integration
という名前の通知統合を表す NotificationIntegration
オブジェクトを作成します。
from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueGcpPubsubInbound
my_notification_integration = NotificationIntegration(
name="my_gcp_inbound_notification_integration",
enabled=True,
notification_hook=NotificationQueueGcpPubsubInbound(
gcp_pubsub_subscription_name="projects/fake-project-name/subscriptions/sub-test",
)
)
root.notification_integrations.create(my_notification_integration)
通知統合の詳細の取得¶
NotificationIntegration
オブジェクトを返す NotificationIntegrationResource.fetch
メソッドを呼び出すことで、通知統合に関する情報を取得できます。
次の例のコードは、 my_notification_integration
という名前の通知統合の情報を取得します。
my_notification_integration = root.notification_integrations["my_notification_integration"].fetch()
print(my_notification_integration.to_dict())
通知統合のリスト¶
NotificationIntegration
オブジェクトの PagedIter
反復子を返す NotificationIntegrationCollection.iter
メソッドを使用して、カタログ統合を一覧表示することができます。
次の例のコードは、名前が my
で始まる通知統合をリストし、それぞれの名前を表示します。
notification_integration_iter = root.notification_integrations.iter(like="my%")
for notification_integration_obj in notification_integration_iter:
print(notification_integration_obj.name)
通知統合のドロップ¶
NotificationIntegrationResource
オブジェクトで通知統合をドロップできます。
次の例のコードは、 my_notification_integration
通知統合リソースオブジェクトを取得し、通知統合をドロップします。
my_notification_integration_res = root.notification_integrations["my_notification_integration"]
my_notification_integration_res.drop()