Gestion des ressources de chargement et de déchargement de données avec Python¶
Vous pouvez utiliser Python pour gérer les ressources de chargement et de déchargement de données dans Snowflake, y compris les volumes externes, les canaux et les zones de préparation.
Conditions préalables¶
Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root
à partir duquel utiliser les Snowflake Python APIs.
Par exemple, le code suivant utilise les paramètres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
En utilisant l’objet Session
obtenu, le code crée un objet Root
pour utiliser les types et les méthodes de l’API. Pour plus d’informations, voir Connexion à Snowflake avec Snowflake Python APIs.
Gestion des zones de préparation¶
Vous pouvez gérer les zones de préparation Snowflake, qui sont des emplacements de fichiers de données dans le stockage cloud. Pour un aperçu des zones de préparation, voir Vue d’ensemble du chargement de données.
Les Snowflake Python APIs représentent les zones de préparation avec deux types distincts :
Stage
: expose les propriétés d’une zone de préparation telles que son nom, son type de chiffrement, ses identifiants de connexion et les paramètres de la table de répertoires.StageResource
: expose les méthodes que vous pouvez utiliser pour extraire unStage
objet correspondant, télécharger et lister les fichiers sur la zone de préparation, et supprimer la zone de préparation.
Création d’une zone de préparation¶
Pour créer une zone de préparation, il faut d’abord créer un objet Stage
, puis créer un objet StageCollection
à partir de l’objet Root
de l’API. En utilisant StageCollection.create
, ajoutez la nouvelle zone de préparation à Snowflake.
Le code de l’exemple suivant crée un objet Stage
qui représente une zone de préparation nommée my_stage
avec un type de chiffrement de SNOWFLAKE_SSE
(chiffrement côté serveur uniquement) :
from snowflake.core.stage import Stage, StageEncryption
my_stage = Stage(
name="my_stage",
encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
Ce code crée une variable StageCollection
stages
et utilise StageCollection.create
pour créer une nouvelle zone de préparation dans Snowflake.
Obtenir les détails de la zone de préparation¶
Vous pouvez obtenir des informations sur une zone de préparation en appelant la méthode StageResource.fetch
, qui renvoie un objet Stage
.
Le code de l’exemple suivant permet d’obtenir des informations sur une zone de préparation nommée my_stage
:
my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Affichage des zones de préparation¶
Vous pouvez répertorier des zones de préparation à l’aide de la méthode StageCollection.iter
qui renvoie un itérateur PagedIter
d’objets Stage
.
Le code de l’exemple suivant répertorie les zones de préparation dont le nom comprend le texte my
et affiche le nom de chacun :
from snowflake.core.stage import StageCollection
stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%") # returns a PagedIter[Stage]
for stage_obj in stage_iter:
print(stage_obj.name)
Effectuer des opérations de zone de préparation¶
Vous pouvez effectuer des opérations de zones de préparation courantes, telles que le téléchargement d’un fichier vers une zone de préparation et l’affichage de fichiers sur une zone de préparation, avec un objet StageResource
.
Pour illustrer certaines opérations que vous pouvez effectuer avec une ressource de zone de préparation, le code de l’exemple suivant effectue les opérations suivantes :
Télécharge un fichier nommé
my-file.yaml
dans la zone de préparationmy_stage
avec les options d’auto-compression et d’écrasement spécifiées.Répertorie tous les fichiers sur la zone de préparation pour vérifier que le fichier a été téléchargé avec succès.
Supprime la zone de préparation.
my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]
my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)
stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
print(stageFile)
my_stage_res.drop()
Gestion des canaux¶
Vous pouvez gérer les canaux Snowflake, qui sont des objets Snowflake nommés de première classe qui contiennent une instruction COPY INTO utilisée par Snowpipe pour charger des données d’une file d’attente d’acquisition dans des tables. Pour un aperçu des canaux, voir Snowpipe.
Les Snowflake Python APIs représentent des canaux avec deux types distincts :
Pipe
: expose les propriétés d’un canal telles que son nom et l’instruction COPY INTO à utiliser par Snowpipe.PipeResource
: expose les méthodes que vous pouvez utiliser pour extraire un objetPipe
correspondant, actualiser le canal avec les fichiers de données mis en zone de préparation et supprimer le canal.
Créer un canal¶
Pour créer un canal, il faut d’abord créer un objet Pipe
, puis créer un objet PipeCollection
à partir de l’objet Root
de l’API. En utilisant PipeCollection.create
, ajoutez le nouveau canal à Snowflake.
Le code de l’exemple suivant crée un objet Pipe
qui représente un canal nommé my_pipe
avec l’instruction COPY INTO spécifiée :
from snowflake.core.pipe import Pipe
my_pipe = Pipe(
name="my_pipe",
comment="creating my pipe",
copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)
pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
Ce code crée une variable PipeCollection
pipes
et utilise PipeCollection.create
pour créer un nouveau canal dans Snowflake.
Obtenir les détails d’un canal¶
Vous pouvez obtenir des informations sur un canal en appelant la méthode PipeResource.fetch
, qui renvoie un objet Pipe
.
Le code de l’exemple suivant permet d’obtenir des informations sur un canal nommé my_pipe
:
my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Affichage de canaux¶
Vous pouvez répertorier des canaux à l’aide de la méthode PipeCollection.iter
qui renvoie un itérateur PagedIter
d’objets Pipe
.
Le code de l’exemple suivant répertorie les canaux dont le nom commence par my
et imprime le nom de chacun :
from snowflake.core.pipe import PipeCollection
pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%") # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
print(pipe_obj.name)
Effectuer des opérations de canal¶
Vous pouvez effectuer des opérations de canal courantes, telles que l’actualisation d’un canal et la suppression d’un canal, avec un objet PipeResource
.
Note
Seule la fonctionnalité REFRESH de ALTER PIPE est actuellement prise en charge.
Pour illustrer les opérations que vous pouvez effectuer avec une ressource de canal, le code de l’exemple suivant effectue les opérations suivantes :
Extrait l’objet de ressource de canal
my_pipe
.Actualise le canal avec des fichiers de données mis en zone de préparation avec le préfixe (ou chemin) facultatif spécifié.
Supprime le canal.
my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]
# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")
my_pipe_res.drop()
Gestion des volumes externes¶
Vous pouvez gérer des volumes externes, qui sont des objets Snowflake nommés, de niveau compte, que vous utilisez pour connecter Snowflake à votre stockage cloud externe pour des tables Apache Iceberg™. Pour plus d’informations, consultez la section Volume externe de Tables Apache Iceberg™.
Les Snowflake Python APIs représentent des volumes externes avec deux types distincts :
ExternalVolume
: expose les propriétés d’un volume externe telles que son nom et ses emplacements de stockage.ExternalVolumeResource
: expose les méthodes que vous pouvez utiliser pour récupérer un objetExternalVolume
correspondant, puis supprimer et annuler la suppression du volume externe.
Création d’un volume externe¶
Pour créer un volume externe, créez d’abord un objet ExternalVolume
, puis créez un objet ExternalVolumeCollection
de l’objet d’API Root
. En utilisant ExternalVolumeCollection.create
, ajoutez le nouveau volume externe à Snowflake.
Le code de l’exemple suivant crée un objet ExternalVolume
qui représente un volume externe nommé my_external_volume
avec les emplacements de stockage S3 AWS spécifiés :
from snowflake.core.external_volume import (
ExternalVolume,
StorageLocationS3,
)
my_external_volume = ExternalVolume(
name="my_external_volume",
storage_locations=[
StorageLocationS3(
name="my-s3-us-west-1",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
StorageLocationS3(
name="my-s3-us-west-2",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
]
)
root.external_volumes.create(my_external_volume)
Obtenir les détails du volume externe¶
Vous pouvez obtenir des informations sur un volume externe en appelant la méthode ExternalVolumeResource.fetch
, qui renvoie un objet ExternalVolume
.
Le code de l’exemple suivant obtient des informations sur un volume externe nommé my_external_volume
:
my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Affichage des volumes externes¶
Vous pouvez répertorier des volumes externes à l’aide de la méthode ExternalVolumeCollection.iter
qui renvoie un itérateur PagedIter
d’objets ExternalVolume
.
Le code de l’exemple suivant répertorie les volumes externes dont le nom commence par my
et imprime le nom de chacun :
external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
print(external_volume_obj.name)
Exécution d’opérations sur des volumes externes¶
Vous pouvez effectuer des opérations courantes sur les volumes externes, telles que la suppression et l’annulation de la suppression d’un volume externe, avec un objet ExternalVolumeResource
.
Pour illustrer les opérations que vous pouvez effectuer avec une ressource de volume externe, le code de l’exemple suivant effectue les opérations suivantes :
Extrait l’objet de ressource de volume externe
my_external_volume
.Supprime le volume externe.
Annule la suppression du volume externe.
my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()