Gestion des intégrations Snowflake avec Python

Vous pouvez utiliser Python pour gérer différents types d’intégrations dans Snowflake.

Conditions préalables

Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root à partir duquel utiliser les Snowflake Python APIs.

Par exemple, le code suivant utilise les paramètres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

En utilisant l’objet Session obtenu, le code crée un objet Root pour utiliser les types et les méthodes de l’API. Pour plus d’informations, voir Connexion à Snowflake avec Snowflake Python APIs.

Gestion des intégrations de catalogue

Vous pouvez gérer les intégrations de catalogue pour les tables Apache Iceberg™ de votre compte. Une intégration de catalogue est un objet Snowflake nommé de niveau compte qui stocke des informations sur la façon dont vos métadonnées de table Iceberg sont organisées lorsque vous n’utilisez pas Snowflake en tant que catalogue Iceberg ou quand vous souhaitez l’intégrer avec Snowflake Open Catalog. Pour plus d’informations, consultez la section Intégration de catalogue dans Tables Apache Iceberg™.

Note

ALTER CATALOG INTEGRATION n’est actuellement pas pris en charge.

Les Snowflake Python APIs représentent des intégrations de catalogue avec deux types distincts :

  • CatalogIntegration : expose les propriétés d’une intégration de catalogue telles que son nom, son format de table et ses paramètres de catalogue.

  • CatalogIntegrationResource : expose des méthodes que vous pouvez utiliser pour récupérer un objet CatalogIntegration correspondant et pour supprimer l’intégration de catalogue.

Création d’une intégration de catalogue

Pour créer une intégration de catalogue, il faut d’abord créer un objet CatalogIntegration, puis créer un objet CatalogIntegrationCollection à partir de l’objet Root de l’API. En utilisant CatalogIntegrationCollection.create, ajoutez la nouvelle intégration du catalogue à Snowflake.

Vous pouvez créer des intégrations de catalogue dans votre compte pour les types de catalogues Iceberg externes suivants.

AWS Glue

Le code de l’exemple suivant crée un objet CatalogIntegration qui représente une intégration de catalogue nommée my_catalog_integration pour les tables Iceberg qui utilisent AWS Glue avec les propriétés spécifiées :

from snowflake.core.catalog_integration import CatalogIntegration, Glue

root.catalog_integrations.create(CatalogIntegration(
    name="my_catalog_integration",
    catalog = Glue(
        catalog_namespace="abcd-ns",
        glue_aws_role_arn="arn:aws:iam::123456789012:role/sqsAccess",
        glue_catalog_id="1234567",
    ),
    table_format="ICEBERG",
    enabled=True,
))
Copy

Magasin d’objets

Le code de l’exemple suivant crée un objet CatalogIntegration qui représente une intégration de catalogue nommée my_catalog_integration pour les tables Iceberg qui utilisent un magasin d’objets :

from snowflake.core.catalog_integration import CatalogIntegration, ObjectStore

root.catalog_integrations.create(CatalogIntegration(
    name="my_catalog_integration",
    catalog = ObjectStore(),
    table_format="ICEBERG",
    enabled=True,
))
Copy

Snowflake Open Catalog

Le code de l’exemple suivant crée un objet CatalogIntegration qui représente une intégration de catalogue nommée my_catalog_integration pour les tables Iceberg qui utilisent Open Catalog avec les propriétés spécifiées :

from snowflake.core.catalog_integration import CatalogIntegration, OAuth, Polaris, RestConfig

root.catalog_integrations.create(CatalogIntegration(
    name="my_catalog_integration",
    catalog = Polaris(
        catalog_namespace="abcd-ns",
        rest_config=RestConfig(
            catalog_uri="https://my_account.snowflakecomputing.com/polaris/api/catalog",
            warehouse="my-warehouse",
        ),
        rest_authentication=OAuth(
            type="OAUTH",
            oauth_client_id="my_client_id",
            oauth_client_secret="my_client_secret",
            oauth_allowed_scopes=["PRINCIPAL_ROLE:ALL"],
        ),
    ),
    table_format="ICEBERG",
    enabled=True,
))
Copy

Obtenir les détails de l’intégration du catalogue

Vous pouvez obtenir des informations sur une intégration de catalogue en appelant la méthode CatalogIntegrationResource.fetch, qui renvoie un objet CatalogIntegration.

Le code de l’exemple suivant permet d’obtenir des informations sur une intégration de catalogue nommée my_catalog_integration :

my_catalog_integration = root.catalog_integrations["my_catalog_integration"].fetch()
print(my_catalog_integration.to_dict())
Copy

Affichage des intégrations de catalogue

Vous pouvez répertorier des intégrations de catalogue à l’aide de la méthode CatalogIntegrationCollection.iter qui renvoie un itérateur PagedIter d’objets CatalogIntegration.

Le code de l’exemple suivant répertorie les intégrations de catalogue dont le nom commence par my, et imprime le nom de chacun :

catalog_integration_iter = root.catalog_integrations.iter(like="my%")
for catalog_integration_obj in catalog_integration_iter:
  print(catalog_integration_obj.name)
Copy

Suppression d’une intégration de catalogue

Vous pouvez supprimer une intégration de catalogue avec un objet CatalogIntegrationResource.

Le code dans l’exemple suivant extrait l’objet ressource d’intégration de catalogue my_catalog_integration, puis supprime l’intégration de catalogue.

my_catalog_integration_res = root.catalog_integrations["my_catalog_integration"]
my_catalog_integration_res.drop()
Copy

Gestion des intégrations de notification

Vous pouvez gérer les intégrations de notifications, qui sont des objets Snowflake fournissant une interface entre Snowflake et des services de messagerie tiers tels que des services de mise en file d’attente de messages dans le Cloud tiers, des services de messagerie et des webhooks. Pour plus d’informations, voir Notifications dans Snowflake.

Note

ALTER NOTIFICATION INTEGRATION n’est actuellement pas pris en charge.

Les Snowflake Python APIs représentent des intégrations de notification avec deux types distincts :

  • NotificationIntegration : expose les propriétés d’une intégration de notification telles que son nom et ses paramètres de hook de notification.

  • NotificationIntegrationResource : expose des méthodes que vous pouvez utiliser pour extraire un objet NotificationIntegration correspondant et pour supprimer l’intégration de notification.

Création d’une intégration de notification

Pour créer une intégration de notification, il faut d’abord créer un objet NotificationIntegration, puis créer un objet NotificationIntegrationCollection à partir de l’objet Root de l’API. En utilisant NotificationIntegrationCollection.create, ajoutez la nouvelle intégration de notification à Snowflake.

Vous pouvez créer une intégration de notification pour les types de services de messagerie suivants.

E-mail

Le code dans l’exemple suivant crée un objet NotificationIntegration qui représente une intégration de notification nommée my_email_notification_integration avec les propriétés NotificationEmail spécifiées :

from snowflake.core.notification_integration import NotificationEmail, NotificationIntegration

my_notification_integration = NotificationIntegration(
  name="my_email_notification_integration",
  notification_hook=NotificationEmail(
      allowed_recipients=["test1@snowflake.com", "test2@snowflake.com"],
      default_recipients=["test1@snowflake.com"],
      default_subject="test default subject",
  ),
  enabled=True,
)

root.notification_integrations.create(my_notification_integration)
Copy

Webhooks

Le code dans l’exemple suivant crée un objet NotificationIntegration qui représente une intégration de notification nommée my_webhook_notification_integration avec les propriétés NotificationWebhook spécifiées :

from snowflake.core.notification_integration import NotificationIntegration, NotificationWebhook

my_notification_integration = NotificationIntegration(
  name="my_webhook_notification_integration",
  enabled=False,
  notification_hook=NotificationWebhook(
      webhook_url=webhook_url,
      webhook_secret=WebhookSecret(
          # This example assumes that this secret already exists
          name="mySecret".upper(), database_name=database, schema_name=schema
      ),
      webhook_body_template=webhook_template,
      webhook_headers=webhook_headers,
  ),
)

root.notification_integrations.create(my_notification_integration)
Copy

Sujets Amazon SNS (sortants)

Le code dans l’exemple suivant crée un objet NotificationIntegration qui représente une intégration de notification nommée my_aws_sns_outbound_notification_integration avec les propriétés NotificationQueueAwsSnsOutbound spécifiées :

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAwsSnsOutbound

my_notification_integration = NotificationIntegration(
  name="my_aws_sns_outbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueAwsSnsOutbound(
      aws_sns_topic_arn="arn:aws:sns:us-west-1:123456789012:sns-test-topic",
      aws_sns_role_arn="arn:aws:iam::123456789012:role/sns-test-topic",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

Sujets Azure Event Grid (sortant)

Le code dans l’exemple suivant crée un objet NotificationIntegration qui représente une intégration de notification nommée my_azure_outbound_notification_integration avec les propriétés NotificationQueueAzureEventGridOutbound spécifiées :

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAzureEventGridOutbound

my_notification_integration = NotificationIntegration(
  name="my_azure_outbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueAzureEventGridOutbound(
      azure_event_grid_topic_endpoint="https://fake.queue.core.windows.net/api/events",
      azure_tenant_id="fake.onmicrosoft.com",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

Sujets Azure Event Grid (entrant)

Le code dans l’exemple suivant crée un objet NotificationIntegration qui représente une intégration de notification nommée my_azure_inbound_notification_integration avec les propriétés NotificationQueueAzureEventGridInbound spécifiées :

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueAzureEventGridInbound

my_notification_integration = NotificationIntegration(
  name="my_azure_inbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueAzureEventGridInbound(
      azure_storage_queue_primary_uri="https://fake.queue.core.windows.net/snowapi_queue",
      azure_tenant_id="fake.onmicrosoft.com",
  ),
)

root.notification_integrations.create(my_notification_integration)
Copy

Rubriques Google Pub/Sub (sortant)

Le code dans l’exemple suivant crée un objet NotificationIntegration qui représente une intégration de notification nommée my_gcp_outbound_notification_integration avec les propriétés NotificationQueueGcpPubsubOutbound spécifiées :

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueGcpPubsubOutbound

my_notification_integration = NotificationIntegration(
  name="my_gcp_outbound_notification_integration",
  enabled=False,
  notification_hook=NotificationQueueGcpPubsubOutbound(
      gcp_pubsub_topic_name="projects/fake-project-name/topics/pythonapi-test",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

Rubriques Google Pub/Sub (entrant)

Le code dans l’exemple suivant crée un objet NotificationIntegration qui représente une intégration de notification nommée my_gcp_inbound_notification_integration avec les propriétés NotificationQueueGcpPubsubInbound spécifiées :

from snowflake.core.notification_integration import NotificationIntegration, NotificationQueueGcpPubsubInbound

my_notification_integration = NotificationIntegration(
  name="my_gcp_inbound_notification_integration",
  enabled=True,
  notification_hook=NotificationQueueGcpPubsubInbound(
      gcp_pubsub_subscription_name="projects/fake-project-name/subscriptions/sub-test",
  )
)

root.notification_integrations.create(my_notification_integration)
Copy

Récupération des détails de l’intégration des notifications

Vous pouvez obtenir des informations sur une intégration de notification en appelant la méthode NotificationIntegrationResource.fetch, qui renvoie un objet NotificationIntegration.

Le code de l’exemple suivant permet d’obtenir des informations sur une intégration de notification nommée my_notification_integration :

my_notification_integration = root.notification_integrations["my_notification_integration"].fetch()
print(my_notification_integration.to_dict())
Copy

Affichage des intégrations de notification

Vous pouvez répertorier des intégrations de notification à l’aide de la méthode NotificationIntegrationCollection.iter qui renvoie un itérateur PagedIter d’objets NotificationIntegration.

Le code de l’exemple suivant répertorie les intégrations de notification dont le nom commence par my, et imprime le nom de chacun :

notification_integration_iter = root.notification_integrations.iter(like="my%")
for notification_integration_obj in notification_integration_iter:
  print(notification_integration_obj.name)
Copy

Suppression d’une intégration de notification

Vous pouvez supprimer une intégration de notification avec un objet NotificationIntegrationResource.

Le code dans l’exemple suivant extrait l’objet ressource d’intégration de notification my_notification_integration, puis supprime l’intégration de notification.

my_notification_integration_res = root.notification_integrations["my_notification_integration"]
my_notification_integration_res.drop()
Copy