Verwalten des Ladens von Daten und des Entladens von Ressourcen mit Python¶
Sie können Python verwenden, um das Laden und Entladen von Ressourcen in Snowflake zu verwalten, einschließlich externer Volumes, Pipes und Stagingbereiche.
Voraussetzungen¶
Die Beispiele in diesem Thema gehen davon aus, dass Sie Code hinzugefügt haben, um eine Verbindung zu Snowflake herzustellen und ein Root
-Objekt zu erstellen, von dem aus Sie die Snowflake Python APIs verwenden können.
Beispielsweise verwendet der folgende Code Verbindungsparameter, die in einer Konfigurationsdatei definiert sind, um eine Verbindung zu Snowflake zu erstellen:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Unter Verwendung des resultierenden Session
-Objekts erstellt der Code ein Root
-Objekt, das die Typen und Methoden der API verwendet. Weitere Informationen dazu finden Sie unter Verbinden mit Snowflake mit dem Snowflake Python APIs.
Verwalten von Stagingbereichen¶
Sie können Snowflake-Stagingbereiche verwalten, d. h. Speicherorte von Dateien im Cloudspeicher. Einen Überblick über die Stagingbereiche finden Sie unter Übersicht zum Laden von Daten.
Die Snowflake Python APIs stellt Stagingbereiche mit zwei verschiedenen Typen dar:
Stage
: Zeigt die Eigenschaften eines Stagingbereichs an, z. B. Name, Verschlüsselungstyp, Anmeldeinformationen und Einstellungen für die Verzeichnistabelle.StageResource
: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendesStage
-Objekt abrufen, Dateien auf den Stagingbereich hochladen und auflisten und den Stagingbereich löschen können.
Erstellen eines Stagingbereichs¶
Um einen Stagingbereich zu erstellen, erstellen Sie zunächst ein Stage
-Objekt und dann ein StageCollection
-Objekt aus dem API Root
-Objekt. Fügen Sie mit StageCollection.create
den neuen Stagingbereich zu Snowflake hinzu.
Der Code im folgenden Beispiel erstellt ein Stage
-Objekt, das einen Stagingbereich namens my_stage
mit dem Verschlüsselungstyp SNOWFLAKE_SSE
(nur serverseitige Verschlüsselung) darstellt:
from snowflake.core.stage import Stage, StageEncryption
my_stage = Stage(
name="my_stage",
encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
Der Code erstellt eine StageCollection
-Variable stages
und verwendet StageCollection.create
, um einen neuen Stagingbereich in Snowflake zu erstellen.
Abrufen von Details zum Stagingbereich¶
Sie können Informationen über einen Stagingbereich erhalten, indem Sie die StageResource.fetch
-Methode aufrufen, die ein Stage
-Objekt zurückgibt.
Der Code im folgenden Beispiel ruft Informationen über einen Stagingbereich namens my_stage
ab:
my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Auflistung von Stagingbereichen¶
Sie können Stagingbereiche mit der StageCollection.iter
-Methode auflisten, die einen PagedIter
-Iterator von Stage
-Objekten zurückgibt.
Der Code im folgenden Beispiel listet Stagingbereiche auf, deren Name den my
-Text enthält, und gibt den Namen jedes einzelnen aus:
from snowflake.core.stage import StageCollection
stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%") # returns a PagedIter[Stage]
for stage_obj in stage_iter:
print(stage_obj.name)
Ausführen von Stagingbereichsoperationen¶
Sie können gängige Stagingbereichsoperationen - wie das Hochladen einer Datei auf einen Stagingbereich und das Auflisten von Dateien auf einem Stagingbereich - mit einem StageResource
-Objekt durchführen.
Um einige Operationen zu demonstrieren, die Sie mit einer Ressource im Stagingbereich durchführen können, führt der Code im folgenden Beispiel Folgendes aus:
Lädt eine Datei mit dem Namen
my-file.yaml
in denmy_stage
-Stagingbereich mit den angegebenen Optionen für die automatische Komprimierung und das Überschreiben hoch.Listet alle Dateien im Stagingbereich auf, um zu überprüfen, ob die Datei erfolgreich hochgeladen wurde.
Löscht den Stagingbereich.
my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]
my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)
stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
print(stageFile)
my_stage_res.drop()
Verwalten von Pipes¶
Sie können Snowflake-Pipes verwalten. Dabei handelt es sich um benannte First-Class-Snowflake-Objekte, die eine COPY INTO-Anweisung enthalten, die von Snowpipe zum Laden von Daten aus einer Erfassungswarteschlange in Tabellen verwendet wird. Einen Überblick über die Pipes finden Sie unter Snowpipe.
Die Snowflake Python APIs stellt Rohre mit zwei verschiedenen Typen dar:
Pipe
: Zeigt die Eigenschaften einer Pipe an, wie z. B. ihren Namen und die COPY INTO Anweisung, die von Snowpipe verwendet werden soll.PipeResource
: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendesPipe
-Objekt abrufen, die Pipe mit Stagingdateien aktualisieren und die Pipe löschen können.
Erstellen einer Pipe¶
Um eine Pipe zu erstellen, erstellen Sie zunächst ein Pipe
-Objekt und dann ein PipeCollection
-Objekt aus dem API-Root
-Objekt. Fügen Sie Snowflake mit PipeCollection.create
die neue Pipe hinzu.
Der Code im folgenden Beispiel erstellt ein Pipe
-Objekt, das eine Pipe namens my_pipe
mit der angegebenen COPY INTO-Anweisung darstellt:
from snowflake.core.pipe import Pipe
my_pipe = Pipe(
name="my_pipe",
comment="creating my pipe",
copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)
pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
Der Code erstellt eine PipeCollection
-Variable pipes
und verwendet PipeCollection.create
, um eine neue Pipe in Snowflake zu erstellen.
Details zu den Pipes erhalten¶
Sie können Informationen über eine Pipe erhalten, indem Sie die PipeResource.fetch
-Methode aufrufen, die ein Pipe
-Objekt zurückgibt.
Der Code im folgenden Beispiel ruft Informationen über eine Pipe namens my_pipe
ab:
my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Auflistung von Pipes¶
Sie können Pipes mit der PipeCollection.iter
-Methode auflisten, die einen PagedIter
-Iterator von Pipe
-Objekten zurückgibt.
Der Code im folgenden Beispiel listet alle Pipes auf, deren Name mit my
beginnt, und gibt den Namen jeder Pipe aus:
from snowflake.core.pipe import PipeCollection
pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%") # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
print(pipe_obj.name)
Ausführen von Pipe-Operationen¶
Sie können gängige Operationen, wie das Aktualisieren und das Löschen einer Pipe, mit einem PipeResource
-Objekt durchführen.
Bemerkung
Derzeit wird nur die REFRESH-Funktion von ALTER PIPE unterstützt.
Zur Veranschaulichung der Operationen, die Sie mit einer Ressource durchführen können, wird der Code im folgenden Beispiel wie folgt ausgeführt:
Holt das Objekt
my_pipe
-Pipe-Ressource.Aktualisiert die Pipe mit Stagingdateien mit dem angegebenen, optionalen Präfix (oder Pfad).
Löscht die Pipe.
my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]
# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")
my_pipe_res.drop()
Verwalten von externen Volumes¶
Sie können externe Volumes verwalten, bei denen es sich um benannte Snowflake-Objekte auf Kontoebene handelt, mit denen Sie Snowflake mit Ihrem externen Cloudspeicher für Apache Iceberg™-Tabellen verbinden können. Weitere Informationen finden Sie im Abschnitt Externes Volume auf Apache Iceberg™-Tabellen.
Die Snowflake Python APIs stellt externe Volumes mit zwei verschiedenen Typen dar:
ExternalVolume
: Zeigt die Eigenschaften eines externen Volumes an, z. B. seinen Namen und Speicherorte.ExternalVolumeResource
: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendesExternalVolume
-Objekt abrufen und das externe Volume löschen und wiederherstellen können.
Erstellen eines externen Volumes¶
Um ein externes Volume zu erstellen, erstellen Sie zunächst ein ExternalVolume
-Objekt und dann ein ExternalVolumeCollection
-Objekt aus dem API Root
-Objekt. Fügen Sie Snowflake mit ExternalVolumeCollection.create
das neue externe Volume zu hinzu.
Der Code im folgenden Beispiel erstellt ein ExternalVolume
-Objekt, das ein externes Volume mit dem Namen my_external_volume
und den angegebenen AWS-S3-Speicherorten darstellt:
from snowflake.core.external_volume import (
ExternalVolume,
StorageLocationS3,
)
my_external_volume = ExternalVolume(
name="my_external_volume",
storage_locations=[
StorageLocationS3(
name="my-s3-us-west-1",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
StorageLocationS3(
name="my-s3-us-west-2",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
]
)
root.external_volumes.create(my_external_volume)
Abrufen von Details zu externen Volumes¶
Sie können Informationen über ein externes Volume erhalten, indem Sie die ExternalVolumeResource.fetch
-Methode aufrufen, die ein ExternalVolume
-Objekt zurückgibt.
Der Code im folgenden Beispiel ruft Informationen über ein externes Volume namens my_external_volume
ab:
my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Auflistung von externen Volumes¶
Sie können externe Volumes mit der ExternalVolumeCollection.iter
-Methode auflisten, die einen PagedIter
-Iterator von ExternalVolume
-Objekten zurückgibt.
Der Code im folgenden Beispiel listet externe Volumes auf, deren Name mit my
beginnt, und gibt den Namen jedes einzelnen aus:
external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
print(external_volume_obj.name)
Ausführen von Operationen mit externem Volumes¶
Sie können gängige Operationen für externe Volumes, wie z. B. das Löschen und Wiederherstellen eines externen Volumes, mit einem ExternalVolumeResource
-Objekt durchführen.
Zur Veranschaulichung der Operationen, die Sie mit einer Ressource für ein externes Volume durchführen können, wird der Code im folgenden Beispiel wie folgt ausgeführt:
Ruft das
my_external_volume
externe Volume-Ressourcen-Objekt ab.Löscht das externe Volume.
Stellt das externe Volumen wieder her.
my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()