Verwalten des Ladens von Daten und des Entladens von Ressourcen mit Python

Sie können Python verwenden, um das Laden und Entladen von Ressourcen in Snowflake zu verwalten, einschließlich externer Volumes, Pipes und Stagingbereiche.

Voraussetzungen

Die Beispiele in diesem Thema gehen davon aus, dass Sie Code hinzugefügt haben, um eine Verbindung zu Snowflake herzustellen und ein Root-Objekt zu erstellen, von dem aus Sie die Snowflake Python APIs verwenden können.

Beispielsweise verwendet der folgende Code Verbindungsparameter, die in einer Konfigurationsdatei definiert sind, um eine Verbindung zu Snowflake zu erstellen:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

Unter Verwendung des resultierenden Session-Objekts erstellt der Code ein Root-Objekt, das die Typen und Methoden der API verwendet. Weitere Informationen dazu finden Sie unter Verbinden mit Snowflake mit dem Snowflake Python APIs.

Verwalten von Stagingbereichen

Sie können Snowflake-Stagingbereiche verwalten, d. h. Speicherorte von Dateien im Cloudspeicher. Einen Überblick über die Stagingbereiche finden Sie unter Übersicht zum Laden von Daten.

Die Snowflake Python APIs stellt Stagingbereiche mit zwei verschiedenen Typen dar:

  • Stage: Zeigt die Eigenschaften eines Stagingbereichs an, z. B. Name, Verschlüsselungstyp, Anmeldeinformationen und Einstellungen für die Verzeichnistabelle.

  • StageResource: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendes Stage-Objekt abrufen, Dateien auf den Stagingbereich hochladen und auflisten und den Stagingbereich löschen können.

Erstellen eines Stagingbereichs

Um einen Stagingbereich zu erstellen, erstellen Sie zunächst ein Stage-Objekt und dann ein StageCollection-Objekt aus dem API Root-Objekt. Fügen Sie mit StageCollection.create den neuen Stagingbereich zu Snowflake hinzu.

Der Code im folgenden Beispiel erstellt ein Stage-Objekt, das einen Stagingbereich namens my_stage mit dem Verschlüsselungstyp SNOWFLAKE_SSE (nur serverseitige Verschlüsselung) darstellt:

from snowflake.core.stage import Stage, StageEncryption

my_stage = Stage(
  name="my_stage",
  encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
Copy

Der Code erstellt eine StageCollection-Variable stages und verwendet StageCollection.create, um einen neuen Stagingbereich in Snowflake zu erstellen.

Abrufen von Details zum Stagingbereich

Sie können Informationen über einen Stagingbereich erhalten, indem Sie die StageResource.fetch-Methode aufrufen, die ein Stage-Objekt zurückgibt.

Der Code im folgenden Beispiel ruft Informationen über einen Stagingbereich namens my_stage ab:

my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Copy

Auflistung von Stagingbereichen

Sie können Stagingbereiche mit der StageCollection.iter-Methode auflisten, die einen PagedIter-Iterator von Stage-Objekten zurückgibt.

Der Code im folgenden Beispiel listet Stagingbereiche auf, deren Name den my-Text enthält, und gibt den Namen jedes einzelnen aus:

from snowflake.core.stage import StageCollection

stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%")  # returns a PagedIter[Stage]
for stage_obj in stage_iter:
  print(stage_obj.name)
Copy

Ausführen von Stagingbereichsoperationen

Sie können gängige Stagingbereichsoperationen - wie das Hochladen einer Datei auf einen Stagingbereich und das Auflisten von Dateien auf einem Stagingbereich - mit einem StageResource-Objekt durchführen.

Um einige Operationen zu demonstrieren, die Sie mit einer Ressource im Stagingbereich durchführen können, führt der Code im folgenden Beispiel Folgendes aus:

  1. Lädt eine Datei mit dem Namen my-file.yaml in den my_stage-Stagingbereich mit den angegebenen Optionen für die automatische Komprimierung und das Überschreiben hoch.

  2. Listet alle Dateien im Stagingbereich auf, um zu überprüfen, ob die Datei erfolgreich hochgeladen wurde.

  3. Löscht den Stagingbereich.

my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]

my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)

stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
  print(stageFile)

my_stage_res.drop()
Copy

Verwalten von Pipes

Sie können Snowflake-Pipes verwalten. Dabei handelt es sich um benannte First-Class-Snowflake-Objekte, die eine COPY INTO-Anweisung enthalten, die von Snowpipe zum Laden von Daten aus einer Erfassungswarteschlange in Tabellen verwendet wird. Einen Überblick über die Pipes finden Sie unter Snowpipe.

Die Snowflake Python APIs stellt Rohre mit zwei verschiedenen Typen dar:

  • Pipe: Zeigt die Eigenschaften einer Pipe an, wie z. B. ihren Namen und die COPY INTO Anweisung, die von Snowpipe verwendet werden soll.

  • PipeResource: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendes Pipe-Objekt abrufen, die Pipe mit Stagingdateien aktualisieren und die Pipe löschen können.

Erstellen einer Pipe

Um eine Pipe zu erstellen, erstellen Sie zunächst ein Pipe-Objekt und dann ein PipeCollection-Objekt aus dem API-Root-Objekt. Fügen Sie Snowflake mit PipeCollection.create die neue Pipe hinzu.

Der Code im folgenden Beispiel erstellt ein Pipe-Objekt, das eine Pipe namens my_pipe mit der angegebenen COPY INTO-Anweisung darstellt:

from snowflake.core.pipe import Pipe

my_pipe = Pipe(
  name="my_pipe",
  comment="creating my pipe",
  copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)

pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
Copy

Der Code erstellt eine PipeCollection-Variable pipes und verwendet PipeCollection.create, um eine neue Pipe in Snowflake zu erstellen.

Details zu den Pipes erhalten

Sie können Informationen über eine Pipe erhalten, indem Sie die PipeResource.fetch-Methode aufrufen, die ein Pipe-Objekt zurückgibt.

Der Code im folgenden Beispiel ruft Informationen über eine Pipe namens my_pipe ab:

my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Copy

Auflistung von Pipes

Sie können Pipes mit der PipeCollection.iter-Methode auflisten, die einen PagedIter-Iterator von Pipe-Objekten zurückgibt.

Der Code im folgenden Beispiel listet alle Pipes auf, deren Name mit my beginnt, und gibt den Namen jeder Pipe aus:

from snowflake.core.pipe import PipeCollection

pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%")  # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
  print(pipe_obj.name)
Copy

Ausführen von Pipe-Operationen

Sie können gängige Operationen, wie das Aktualisieren und das Löschen einer Pipe, mit einem PipeResource-Objekt durchführen.

Bemerkung

Derzeit wird nur die REFRESH-Funktion von ALTER PIPE unterstützt.

Zur Veranschaulichung der Operationen, die Sie mit einer Ressource durchführen können, wird der Code im folgenden Beispiel wie folgt ausgeführt:

  1. Holt das Objekt my_pipe-Pipe-Ressource.

  2. Aktualisiert die Pipe mit Stagingdateien mit dem angegebenen, optionalen Präfix (oder Pfad).

  3. Löscht die Pipe.

my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]

# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")

my_pipe_res.drop()
Copy

Verwalten von externen Volumes

Sie können externe Volumes verwalten, bei denen es sich um benannte Snowflake-Objekte auf Kontoebene handelt, mit denen Sie Snowflake mit Ihrem externen Cloudspeicher für Apache Iceberg™-Tabellen verbinden können. Weitere Informationen finden Sie im Abschnitt Externes Volume auf Apache Iceberg™-Tabellen.

Die Snowflake Python APIs stellt externe Volumes mit zwei verschiedenen Typen dar:

  • ExternalVolume: Zeigt die Eigenschaften eines externen Volumes an, z. B. seinen Namen und Speicherorte.

  • ExternalVolumeResource: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendes ExternalVolume-Objekt abrufen und das externe Volume löschen und wiederherstellen können.

Erstellen eines externen Volumes

Um ein externes Volume zu erstellen, erstellen Sie zunächst ein ExternalVolume-Objekt und dann ein ExternalVolumeCollection-Objekt aus dem API Root-Objekt. Fügen Sie Snowflake mit ExternalVolumeCollection.create das neue externe Volume zu hinzu.

Der Code im folgenden Beispiel erstellt ein ExternalVolume-Objekt, das ein externes Volume mit dem Namen my_external_volume und den angegebenen AWS-S3-Speicherorten darstellt:

from snowflake.core.external_volume import (
    ExternalVolume,
    StorageLocationS3,
)

my_external_volume = ExternalVolume(
    name="my_external_volume",
    storage_locations=[
        StorageLocationS3(
            name="my-s3-us-west-1",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
        StorageLocationS3(
            name="my-s3-us-west-2",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
    ]
)

root.external_volumes.create(my_external_volume)
Copy

Abrufen von Details zu externen Volumes

Sie können Informationen über ein externes Volume erhalten, indem Sie die ExternalVolumeResource.fetch-Methode aufrufen, die ein ExternalVolume-Objekt zurückgibt.

Der Code im folgenden Beispiel ruft Informationen über ein externes Volume namens my_external_volume ab:

my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Copy

Auflistung von externen Volumes

Sie können externe Volumes mit der ExternalVolumeCollection.iter-Methode auflisten, die einen PagedIter-Iterator von ExternalVolume-Objekten zurückgibt.

Der Code im folgenden Beispiel listet externe Volumes auf, deren Name mit my beginnt, und gibt den Namen jedes einzelnen aus:

external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
  print(external_volume_obj.name)
Copy

Ausführen von Operationen mit externem Volumes

Sie können gängige Operationen für externe Volumes, wie z. B. das Löschen und Wiederherstellen eines externen Volumes, mit einem ExternalVolumeResource-Objekt durchführen.

Zur Veranschaulichung der Operationen, die Sie mit einer Ressource für ein externes Volume durchführen können, wird der Code im folgenden Beispiel wie folgt ausgeführt:

  1. Ruft das my_external_volume externe Volume-Ressourcen-Objekt ab.

  2. Löscht das externe Volume.

  3. Stellt das externe Volumen wieder her.

my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()
Copy