Gestion des ressources de chargement et de déchargement de données avec Python

Vous pouvez utiliser Python pour gérer les ressources de chargement et de déchargement de données dans Snowflake, y compris les volumes externes, les canaux et les zones de préparation.

Conditions préalables

Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root à partir duquel utiliser les Snowflake Python APIs.

Par exemple, le code suivant utilise les paramètres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

En utilisant l’objet Session obtenu, le code crée un objet Root pour utiliser les types et les méthodes de l’API. Pour plus d’informations, voir Connexion à Snowflake avec Snowflake Python APIs.

Gestion des zones de préparation

Vous pouvez gérer les zones de préparation Snowflake, qui sont des emplacements de fichiers de données dans le stockage cloud. Pour un aperçu des zones de préparation, voir Vue d’ensemble du chargement de données.

Les Snowflake Python APIs représentent les zones de préparation avec deux types distincts :

  • Stage : expose les propriétés d’une zone de préparation telles que son nom, son type de chiffrement, ses identifiants de connexion et les paramètres de la table de répertoires.

  • StageResource : expose les méthodes que vous pouvez utiliser pour extraire un Stage objet correspondant, télécharger et lister les fichiers sur la zone de préparation, et supprimer la zone de préparation.

Création d’une zone de préparation

Pour créer une zone de préparation, il faut d’abord créer un objet Stage, puis créer un objet StageCollection à partir de l’objet Root de l’API. En utilisant StageCollection.create, ajoutez la nouvelle zone de préparation à Snowflake.

Le code de l’exemple suivant crée un objet Stage qui représente une zone de préparation nommée my_stage avec un type de chiffrement de SNOWFLAKE_SSE (chiffrement côté serveur uniquement) :

from snowflake.core.stage import Stage, StageEncryption

my_stage = Stage(
  name="my_stage",
  encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
Copy

Ce code crée une variable StageCollection stages et utilise StageCollection.create pour créer une nouvelle zone de préparation dans Snowflake.

Obtenir les détails de la zone de préparation

Vous pouvez obtenir des informations sur une zone de préparation en appelant la méthode StageResource.fetch, qui renvoie un objet Stage.

Le code de l’exemple suivant permet d’obtenir des informations sur une zone de préparation nommée my_stage :

my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Copy

Affichage des zones de préparation

Vous pouvez répertorier des zones de préparation à l’aide de la méthode StageCollection.iter qui renvoie un itérateur PagedIter d’objets Stage.

Le code de l’exemple suivant répertorie les zones de préparation dont le nom comprend le texte my et affiche le nom de chacun :

from snowflake.core.stage import StageCollection

stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%")  # returns a PagedIter[Stage]
for stage_obj in stage_iter:
  print(stage_obj.name)
Copy

Effectuer des opérations de zone de préparation

Vous pouvez effectuer des opérations de zones de préparation courantes, telles que le téléchargement d’un fichier vers une zone de préparation et l’affichage de fichiers sur une zone de préparation, avec un objet StageResource.

Pour illustrer certaines opérations que vous pouvez effectuer avec une ressource de zone de préparation, le code de l’exemple suivant effectue les opérations suivantes :

  1. Télécharge un fichier nommé my-file.yaml dans la zone de préparation my_stage avec les options d’auto-compression et d’écrasement spécifiées.

  2. Répertorie tous les fichiers sur la zone de préparation pour vérifier que le fichier a été téléchargé avec succès.

  3. Supprime la zone de préparation.

my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]

my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)

stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
  print(stageFile)

my_stage_res.drop()
Copy

Gestion des canaux

Vous pouvez gérer les canaux Snowflake, qui sont des objets Snowflake nommés de première classe qui contiennent une instruction COPY INTO utilisée par Snowpipe pour charger des données d’une file d’attente d’acquisition dans des tables. Pour un aperçu des canaux, voir Snowpipe.

Les Snowflake Python APIs représentent des canaux avec deux types distincts :

  • Pipe : expose les propriétés d’un canal telles que son nom et l’instruction COPY INTO à utiliser par Snowpipe.

  • PipeResource : expose les méthodes que vous pouvez utiliser pour extraire un objet Pipe correspondant, actualiser le canal avec les fichiers de données mis en zone de préparation et supprimer le canal.

Créer un canal

Pour créer un canal, il faut d’abord créer un objet Pipe, puis créer un objet PipeCollection à partir de l’objet Root de l’API. En utilisant PipeCollection.create, ajoutez le nouveau canal à Snowflake.

Le code de l’exemple suivant crée un objet Pipe qui représente un canal nommé my_pipe avec l’instruction COPY INTO spécifiée :

from snowflake.core.pipe import Pipe

my_pipe = Pipe(
  name="my_pipe",
  comment="creating my pipe",
  copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)

pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
Copy

Ce code crée une variable PipeCollection pipes et utilise PipeCollection.create pour créer un nouveau canal dans Snowflake.

Obtenir les détails d’un canal

Vous pouvez obtenir des informations sur un canal en appelant la méthode PipeResource.fetch, qui renvoie un objet Pipe.

Le code de l’exemple suivant permet d’obtenir des informations sur un canal nommé my_pipe :

my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Copy

Affichage de canaux

Vous pouvez répertorier des canaux à l’aide de la méthode PipeCollection.iter qui renvoie un itérateur PagedIter d’objets Pipe.

Le code de l’exemple suivant répertorie les canaux dont le nom commence par my et imprime le nom de chacun :

from snowflake.core.pipe import PipeCollection

pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%")  # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
  print(pipe_obj.name)
Copy

Effectuer des opérations de canal

Vous pouvez effectuer des opérations de canal courantes, telles que l’actualisation d’un canal et la suppression d’un canal, avec un objet PipeResource.

Note

Seule la fonctionnalité REFRESH de ALTER PIPE est actuellement prise en charge.

Pour illustrer les opérations que vous pouvez effectuer avec une ressource de canal, le code de l’exemple suivant effectue les opérations suivantes :

  1. Extrait l’objet de ressource de canal my_pipe.

  2. Actualise le canal avec des fichiers de données mis en zone de préparation avec le préfixe (ou chemin) facultatif spécifié.

  3. Supprime le canal.

my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]

# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")

my_pipe_res.drop()
Copy

Gestion des volumes externes

Vous pouvez gérer des volumes externes, qui sont des objets Snowflake nommés, de niveau compte, que vous utilisez pour connecter Snowflake à votre stockage cloud externe pour des tables Apache Iceberg™. Pour plus d’informations, consultez la section Volume externe de Tables Apache Iceberg™.

Les Snowflake Python APIs représentent des volumes externes avec deux types distincts :

  • ExternalVolume : expose les propriétés d’un volume externe telles que son nom et ses emplacements de stockage.

  • ExternalVolumeResource : expose les méthodes que vous pouvez utiliser pour récupérer un objet ExternalVolume correspondant, puis supprimer et annuler la suppression du volume externe.

Création d’un volume externe

Pour créer un volume externe, créez d’abord un objet ExternalVolume, puis créez un objet ExternalVolumeCollection de l’objet d’API Root. En utilisant ExternalVolumeCollection.create, ajoutez le nouveau volume externe à Snowflake.

Le code de l’exemple suivant crée un objet ExternalVolume qui représente un volume externe nommé my_external_volume avec les emplacements de stockage S3 AWS spécifiés :

from snowflake.core.external_volume import (
    ExternalVolume,
    StorageLocationS3,
)

my_external_volume = ExternalVolume(
    name="my_external_volume",
    storage_locations=[
        StorageLocationS3(
            name="my-s3-us-west-1",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
        StorageLocationS3(
            name="my-s3-us-west-2",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
    ]
)

root.external_volumes.create(my_external_volume)
Copy

Obtenir les détails du volume externe

Vous pouvez obtenir des informations sur un volume externe en appelant la méthode ExternalVolumeResource.fetch, qui renvoie un objet ExternalVolume.

Le code de l’exemple suivant obtient des informations sur un volume externe nommé my_external_volume :

my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Copy

Affichage des volumes externes

Vous pouvez répertorier des volumes externes à l’aide de la méthode ExternalVolumeCollection.iter qui renvoie un itérateur PagedIter d’objets ExternalVolume.

Le code de l’exemple suivant répertorie les volumes externes dont le nom commence par my et imprime le nom de chacun :

external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
  print(external_volume_obj.name)
Copy

Exécution d’opérations sur des volumes externes

Vous pouvez effectuer des opérations courantes sur les volumes externes, telles que la suppression et l’annulation de la suppression d’un volume externe, avec un objet ExternalVolumeResource.

Pour illustrer les opérations que vous pouvez effectuer avec une ressource de volume externe, le code de l’exemple suivant effectue les opérations suivantes :

  1. Extrait l’objet de ressource de volume externe my_external_volume.

  2. Supprime le volume externe.

  3. Annule la suppression du volume externe.

my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()
Copy