Managing data loading and unloading resources with Python¶
You can use Python to manage data loading and unloading resources in Snowflake, including external volumes, pipes, and stages.
Prerequisites¶
The examples in this topic assume that you’ve added code to connect with Snowflake and to create a Root
object from which to use the
Snowflake Python APIs.
For example, the following code uses connection parameters defined in a configuration file to create a connection to Snowflake:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Using the resulting Session
object, the code creates a Root
object to use the API’s types and methods. For more information,
see Connect to Snowflake with the Snowflake Python APIs.
Managing stages¶
You can manage Snowflake stages, which are locations of data files in cloud storage. For an overview of stages, see Overview of data loading.
The Snowflake Python APIs represents stages with two separate types:
Stage
: Exposes a stage’s properties such as its name, encryption type, credentials, and directory table settings.StageResource
: Exposes methods you can use to fetch a correspondingStage
object, upload and list files on the stage, and drop the stage.
Creating a stage¶
To create a stage, first create a Stage
object, and then create a StageCollection
object from the API Root
object. Using StageCollection.create
, add the new stage to Snowflake.
Code in the following example creates a Stage
object that represents a stage named my_stage
with an encryption type of
SNOWFLAKE_SSE
(server-side encryption only):
from snowflake.core.stage import Stage, StageEncryption
my_stage = Stage(
name="my_stage",
encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
The code creates a StageCollection
variable stages
and uses StageCollection.create
to create a new stage in Snowflake.
Getting stage details¶
You can get information about a stage by calling the StageResource.fetch
method, which returns a Stage
object.
Code in the following example gets information about a stage named my_stage
:
my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Listing stages¶
You can list stages using the StageCollection.iter
method, which returns a PagedIter
iterator of Stage
objects.
Code in the following example lists stages whose name includes the text my
and prints the name of each:
from snowflake.core.stage import StageCollection
stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%") # returns a PagedIter[Stage]
for stage_obj in stage_iter:
print(stage_obj.name)
Performing stage operations¶
You can perform common stage operations—such as uploading a file to a stage and listing files on a stage—with a StageResource
object.
Note
Starting with version 0.12.0 of the API, the following StageResource
methods have been renamed. This renaming is done to
align the Python API with the SQL API more consistently.
The old method names are now aliases that call the new method names, so the old method names will still work as expected.
Old method name |
New method name |
---|---|
|
|
|
|
To demonstrate some operations you can do with a stage resource, code in the following example does the following:
Uploads a file named
my-file.yaml
to themy_stage
stage with the specified auto-compress and overwrite options.Lists all files on the stage to verify that the file was uploaded successfully.
Drops the stage.
my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]
my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)
stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
print(stageFile)
my_stage_res.drop()
Managing pipes¶
You can manage Snowflake pipes, which are named, first-class Snowflake objects that contain a COPY INTO statement used by Snowpipe to load data from an ingestion queue into tables. For an overview of pipes, see Snowpipe.
The Snowflake Python APIs represents pipes with two separate types:
Pipe
: Exposes a pipe’s properties such as its name and the COPY INTO statement to be used by Snowpipe.PipeResource
: Exposes methods you can use to fetch a correspondingPipe
object, refresh the pipe with staged data files, and drop the pipe.
Creating a pipe¶
To create a pipe, first create a Pipe
object, and then create a PipeCollection
object from the API Root
object. Using PipeCollection.create
, add the new pipe to Snowflake.
Code in the following example creates a Pipe
object that represents a pipe named my_pipe
with the specified COPY INTO
statement:
from snowflake.core.pipe import Pipe
my_pipe = Pipe(
name="my_pipe",
comment="creating my pipe",
copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)
pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
The code creates a PipeCollection
variable pipes
and uses PipeCollection.create
to create a new pipe in Snowflake.
Getting pipe details¶
You can get information about a pipe by calling the PipeResource.fetch
method, which returns a Pipe
object.
Code in the following example gets information about a pipe named my_pipe
:
my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Listing pipes¶
You can list pipes using the PipeCollection.iter
method, which returns a PagedIter
iterator of Pipe
objects.
Code in the following example lists pipes whose name starts with my
and prints the name of each:
from snowflake.core.pipe import PipeCollection
pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%") # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
print(pipe_obj.name)
Performing pipe operations¶
You can perform common pipe operations—such as refreshing a pipe and dropping a pipe—with a PipeResource
object.
Note
Only the REFRESH functionality of ALTER PIPE is currently supported.
To demonstrate operations you can do with a pipe resource, code in the following example does the following:
Fetches the
my_pipe
pipe resource object.Refreshes the pipe with staged data files with the specified, optional prefix (or path).
Drops the pipe.
my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]
# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")
my_pipe_res.drop()
Managing external volumes¶
You can manage external volumes, which are named, account-level Snowflake objects that you use to connect Snowflake to your external cloud storage for Apache Iceberg™ tables. For more information, see the External volume section of Apache Iceberg™ tables.
The Snowflake Python APIs represents external volumes with two separate types:
ExternalVolume
: Exposes an external volume’s properties such as its name and storage locations.ExternalVolumeResource
: Exposes methods you can use to fetch a correspondingExternalVolume
object, and drop and undrop the external volume.
Creating an external volume¶
To create an external volume, first create an ExternalVolume
object, and then create an ExternalVolumeCollection
object
from the API Root
object. Using ExternalVolumeCollection.create
, add the new external volume to Snowflake.
Code in the following example creates an ExternalVolume
object that represents an external volume named my_external_volume
with the specified AWS S3 storage locations:
from snowflake.core.external_volume import (
ExternalVolume,
StorageLocationS3,
)
my_external_volume = ExternalVolume(
name="my_external_volume",
storage_locations=[
StorageLocationS3(
name="my-s3-us-west-1",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
StorageLocationS3(
name="my-s3-us-west-2",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
]
)
root.external_volumes.create(my_external_volume)
Getting external volume details¶
You can get information about an external volume by calling the ExternalVolumeResource.fetch
method, which returns an
ExternalVolume
object.
Code in the following example gets information about an external volume named my_external_volume
:
my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Listing external volumes¶
You can list external volumes using the ExternalVolumeCollection.iter
method, which returns a PagedIter
iterator of
ExternalVolume
objects.
Code in the following example lists external volumes whose name starts with my
and prints the name of each:
external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
print(external_volume_obj.name)
Performing external volume operations¶
You can perform common external volume operations—such as dropping and undropping an external volume—with an
ExternalVolumeResource
object.
To demonstrate operations you can do with an external volume resource, code in the following example does the following:
Fetches the
my_external_volume
external volume resource object.Drops the external volume.
Undrops the external volume.
my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()