Gerenciamento de recursos de carregamento e descarregamento de dados com Python¶
É possível usar o Python para gerenciar recursos de carregamento de dados no Snowflake, incluindo volumes externos, canais e estágios.
Pré-requisitos¶
Os exemplos neste tópico pressupõem que você tenha adicionado código para se conectar ao Snowflake e criar um objeto Root
a partir do qual usar o Snowflake Python APIs.
Por exemplo, o seguinte código usa parâmetros de conexão definidos em um arquivo de configuração para criar uma conexão com o Snowflake:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Usando o objeto Session
resultante, o código cria um objeto Root
para usar os tipos e métodos de API. Para obter mais informações, consulte Conexão ao Snowflake com o Snowflake Python APIs.
Gerenciamento de estágios¶
É possível gerenciar estágios Snowflake, que são locais de arquivos de dados no armazenamento em nuvem. Para uma visão geral dos estágios, consulte Visão geral do carregamento de dados.
O Snowflake Python APIs representa estágios com dois tipos separados:
Stage
: Expõe as propriedades de um estágio, como seu nome, tipo de criptografia, credenciais e configurações da tabela de diretório.StageResource
: Expõe os métodos que você pode usar para buscar um objetoStage
correspondente, carregar e listar arquivos no estágio e descartar o estágio.
Criação de um estágio¶
Para criar um estágio, primeiro crie um objeto Stage
e depois crie um objeto StageCollection
a partir do objeto Root
da API. Usando StageCollection.create
, adicione o novo estágio ao Snowflake.
O código no exemplo a seguir cria um objeto Stage
que representa um estágio nomeado my_stage
com um tipo de criptografia de SNOWFLAKE_SSE
(somente criptografia do lado do servidor):
from snowflake.core.stage import Stage, StageEncryption
my_stage = Stage(
name="my_stage",
encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
O código cria uma variável StageCollection
chamada stages
e usa StageCollection.create
para criar um novo estágio no Snowflake.
Como obter detalhes do estágio¶
É possível obter informações sobre um estágio chamando o método StageResource.fetch
, que retorna um objeto Stage
.
O código no exemplo a seguir obtém informações sobre um estágio nomeado my_stage
:
my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Estágios de listagem¶
É possível listar estágios usando o método StageCollection.iter
, que retorna um iterador PagedIter
de objetos Stage
.
O código no exemplo a seguir lista os estágios cujo nome inclui o texto my
e imprime o nome de cada um:
from snowflake.core.stage import StageCollection
stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%") # returns a PagedIter[Stage]
for stage_obj in stage_iter:
print(stage_obj.name)
Execução de operações de estágio¶
É possível executar operações comuns de estágio – como carregar um arquivo em um estágio e listar arquivos em um estágio – com um objeto StageResource
.
Para demonstrar algumas operações que você pode realizar com um recurso de estágio, o código no exemplo a seguir faz o seguinte:
Carrega um arquivo nomeado
my-file.yaml
para o estágiomy_stage
com as opções de compactação automática e substituição especificadas.Lista todos os arquivos no estágio para verificar se o arquivo foi carregado com sucesso.
Descarta o estágio.
my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]
my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)
stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
print(stageFile)
my_stage_res.drop()
Gerenciamento de canais¶
É possível gerenciar canais Snowflake, que são objetos Snowflake nomeados de primeira classe com uma instrução COPY INTO usada pelo Snowpipe para carregar dados de uma fila de ingestão em tabelas. Para uma visão geral dos canais, consulte Snowpipe.
O Snowflake Python APIs representa canais com dois tipos separados:
Pipe
: Expõe as propriedades de um canal, como nome e a instrução COPY INTO a ser usada pelo Snowpipe.PipeResource
: Expõe métodos que você pode usar para buscar um objetoPipe
correspondente, atualizar o canal com arquivos de dados preparados e descartar o canal.
Criação de um canal¶
Para criar um canal, primeiro crie um objeto Pipe
e depois crie um objeto PipeCollection
a partir do objeto Root
da API. Usando PipeCollection.create
, adicione o novo canal ao Snowflake.
O código no exemplo a seguir cria um objeto Pipe
que representa um canal nomeado my_pipe
com a instrução COPY INTO especificada:
from snowflake.core.pipe import Pipe
my_pipe = Pipe(
name="my_pipe",
comment="creating my pipe",
copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)
pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
O código cria uma variável PipeCollection
chamada pipes
e usa PipeCollection.create
para criar um novo canal no Snowflake.
Como obter os detalhes do canal¶
É possível obter informações sobre um canal chamando o método PipeResource.fetch
, que retorna um objeto Pipe
.
O código no exemplo a seguir obtém informações sobre um canal nomeado my_pipe
:
my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Listagem de canais¶
É possível listar canais usando o método PipeCollection.iter
, que retorna um iterador PagedIter
de objetos Pipe
.
O código no exemplo a seguir lista canais cujos nomes começam com my
e imprime o nome de cada um:
from snowflake.core.pipe import PipeCollection
pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%") # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
print(pipe_obj.name)
Execução de operações de canal¶
É possível executar operações comuns de canal – como atualizar um canal e descartar um canal – com um objeto PipeResource
.
Nota
Atualmente, somente a funcionalidade REFRESH de ALTER PIPE é compatível.
Para demonstrar as operações que você pode realizar com um recurso de canal, o código no exemplo a seguir faz o seguinte:
Busca o objeto de recurso do canal
my_pipe
.Atualiza o canal com arquivos de dados em estágio com o prefixo opcional especificado (ou caminho).
Descarta o canal.
my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]
# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")
my_pipe_res.drop()
Gerenciamento de volumes externos¶
É possível gerenciar volumes externos, que são objetos Snowflake nomeados ao nível de conta, que você usa para conectar o Snowflake ao seu armazenamento em nuvem externo para tabelas Apache Iceberg™. Para obter mais informações, consulte a seção Volume externo de Tabelas Apache Iceberg™.
O Snowflake Python APIs representa volumes externos com dois tipos separados:
ExternalVolume
: Expõe as propriedades de um volume externo, como nome e locais de armazenamento.ExternalVolumeResource
: Expõe métodos que você pode usar para buscar um objetoExternalVolume
correspondente e descartar e restaurar o volume externo.
Criação de um volume externo¶
Para criar um volume externo, primeiro crie um objeto ExternalVolume
e, em seguida, crie um objeto ExternalVolumeCollection
a partir do objeto Root
da API. Usando ExternalVolumeCollection.create
, adicione o novo volume externo ao Snowflake.
O código no exemplo a seguir cria um objeto ExternalVolume
que representa um volume externo nomeado my_external_volume
com os locais de armazenamento AWS S3 especificados:
from snowflake.core.external_volume import (
ExternalVolume,
StorageLocationS3,
)
my_external_volume = ExternalVolume(
name="my_external_volume",
storage_locations=[
StorageLocationS3(
name="my-s3-us-west-1",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
StorageLocationS3(
name="my-s3-us-west-2",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
]
)
root.external_volumes.create(my_external_volume)
Obtenção de detalhes de volume externo¶
É possível obter informações sobre um volume externo chamando o método ExternalVolumeResource.fetch
, que retorna um objeto ExternalVolume
.
O código no exemplo a seguir obtém informações sobre um volume externo nomeado my_external_volume
:
my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Listagem de volumes externos¶
É possível listar volumes externos usando o método ExternalVolumeCollection.iter
, que retorna um iterador PagedIter
de objetos ExternalVolume
.
O código no exemplo a seguir lista volumes externos cujos nomes começam com my
e imprime o nome de cada um:
external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
print(external_volume_obj.name)
Execução de operações de volume externo¶
É possível executar operações comuns de volume externo – como descartar e restaurar um volume externo – com um objeto ExternalVolumeResource
.
Para demonstrar as operações que você pode realizar com um recurso de volume externo, o código no exemplo a seguir faz o seguinte:
Busca o objeto de recurso de volume externo
my_external_volume
.Descarta o volume externo.
Restaura o volume externo.
my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()