Gerenciamento de recursos de carregamento e descarregamento de dados com Python

É possível usar o Python para gerenciar recursos de carregamento de dados no Snowflake, incluindo volumes externos, canais e estágios.

Pré-requisitos

Os exemplos neste tópico pressupõem que você tenha adicionado código para se conectar ao Snowflake e criar um objeto Root a partir do qual usar o Snowflake Python APIs.

Por exemplo, o seguinte código usa parâmetros de conexão definidos em um arquivo de configuração para criar uma conexão com o Snowflake:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

Usando o objeto Session resultante, o código cria um objeto Root para usar os tipos e métodos de API. Para obter mais informações, consulte Conexão ao Snowflake com o Snowflake Python APIs.

Gerenciamento de estágios

É possível gerenciar estágios Snowflake, que são locais de arquivos de dados no armazenamento em nuvem. Para uma visão geral dos estágios, consulte Visão geral do carregamento de dados.

O Snowflake Python APIs representa estágios com dois tipos separados:

  • Stage: Expõe as propriedades de um estágio, como seu nome, tipo de criptografia, credenciais e configurações da tabela de diretório.

  • StageResource: Expõe os métodos que você pode usar para buscar um objeto Stage correspondente, carregar e listar arquivos no estágio e descartar o estágio.

Criação de um estágio

Para criar um estágio, primeiro crie um objeto Stage e depois crie um objeto StageCollection a partir do objeto Root da API. Usando StageCollection.create, adicione o novo estágio ao Snowflake.

O código no exemplo a seguir cria um objeto Stage que representa um estágio nomeado my_stage com um tipo de criptografia de SNOWFLAKE_SSE (somente criptografia do lado do servidor):

from snowflake.core.stage import Stage, StageEncryption

my_stage = Stage(
  name="my_stage",
  encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
Copy

O código cria uma variável StageCollection chamada stages e usa StageCollection.create para criar um novo estágio no Snowflake.

Como obter detalhes do estágio

É possível obter informações sobre um estágio chamando o método StageResource.fetch, que retorna um objeto Stage.

O código no exemplo a seguir obtém informações sobre um estágio nomeado my_stage:

my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Copy

Estágios de listagem

É possível listar estágios usando o método StageCollection.iter, que retorna um iterador PagedIter de objetos Stage.

O código no exemplo a seguir lista os estágios cujo nome inclui o texto my e imprime o nome de cada um:

from snowflake.core.stage import StageCollection

stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%")  # returns a PagedIter[Stage]
for stage_obj in stage_iter:
  print(stage_obj.name)
Copy

Execução de operações de estágio

É possível executar operações comuns de estágio – como carregar um arquivo em um estágio e listar arquivos em um estágio – com um objeto StageResource.

Para demonstrar algumas operações que você pode realizar com um recurso de estágio, o código no exemplo a seguir faz o seguinte:

  1. Carrega um arquivo nomeado my-file.yaml para o estágio my_stage com as opções de compactação automática e substituição especificadas.

  2. Lista todos os arquivos no estágio para verificar se o arquivo foi carregado com sucesso.

  3. Descarta o estágio.

my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]

my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)

stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
  print(stageFile)

my_stage_res.drop()
Copy

Gerenciamento de canais

É possível gerenciar canais Snowflake, que são objetos Snowflake nomeados de primeira classe com uma instrução COPY INTO usada pelo Snowpipe para carregar dados de uma fila de ingestão em tabelas. Para uma visão geral dos canais, consulte Snowpipe.

O Snowflake Python APIs representa canais com dois tipos separados:

  • Pipe: Expõe as propriedades de um canal, como nome e a instrução COPY INTO a ser usada pelo Snowpipe.

  • PipeResource: Expõe métodos que você pode usar para buscar um objeto Pipe correspondente, atualizar o canal com arquivos de dados preparados e descartar o canal.

Criação de um canal

Para criar um canal, primeiro crie um objeto Pipe e depois crie um objeto PipeCollection a partir do objeto Root da API. Usando PipeCollection.create, adicione o novo canal ao Snowflake.

O código no exemplo a seguir cria um objeto Pipe que representa um canal nomeado my_pipe com a instrução COPY INTO especificada:

from snowflake.core.pipe import Pipe

my_pipe = Pipe(
  name="my_pipe",
  comment="creating my pipe",
  copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)

pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
Copy

O código cria uma variável PipeCollection chamada pipes e usa PipeCollection.create para criar um novo canal no Snowflake.

Como obter os detalhes do canal

É possível obter informações sobre um canal chamando o método PipeResource.fetch, que retorna um objeto Pipe.

O código no exemplo a seguir obtém informações sobre um canal nomeado my_pipe:

my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Copy

Listagem de canais

É possível listar canais usando o método PipeCollection.iter, que retorna um iterador PagedIter de objetos Pipe.

O código no exemplo a seguir lista canais cujos nomes começam com my e imprime o nome de cada um:

from snowflake.core.pipe import PipeCollection

pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%")  # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
  print(pipe_obj.name)
Copy

Execução de operações de canal

É possível executar operações comuns de canal – como atualizar um canal e descartar um canal – com um objeto PipeResource.

Nota

Atualmente, somente a funcionalidade REFRESH de ALTER PIPE é compatível.

Para demonstrar as operações que você pode realizar com um recurso de canal, o código no exemplo a seguir faz o seguinte:

  1. Busca o objeto de recurso do canal my_pipe.

  2. Atualiza o canal com arquivos de dados em estágio com o prefixo opcional especificado (ou caminho).

  3. Descarta o canal.

my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]

# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")

my_pipe_res.drop()
Copy

Gerenciamento de volumes externos

É possível gerenciar volumes externos, que são objetos Snowflake nomeados ao nível de conta, que você usa para conectar o Snowflake ao seu armazenamento em nuvem externo para tabelas Apache Iceberg™. Para obter mais informações, consulte a seção Volume externo de Tabelas Apache Iceberg™.

O Snowflake Python APIs representa volumes externos com dois tipos separados:

  • ExternalVolume: Expõe as propriedades de um volume externo, como nome e locais de armazenamento.

  • ExternalVolumeResource: Expõe métodos que você pode usar para buscar um objeto ExternalVolume correspondente e descartar e restaurar o volume externo.

Criação de um volume externo

Para criar um volume externo, primeiro crie um objeto ExternalVolume e, em seguida, crie um objeto ExternalVolumeCollection a partir do objeto Root da API. Usando ExternalVolumeCollection.create, adicione o novo volume externo ao Snowflake.

O código no exemplo a seguir cria um objeto ExternalVolume que representa um volume externo nomeado my_external_volume com os locais de armazenamento AWS S3 especificados:

from snowflake.core.external_volume import (
    ExternalVolume,
    StorageLocationS3,
)

my_external_volume = ExternalVolume(
    name="my_external_volume",
    storage_locations=[
        StorageLocationS3(
            name="my-s3-us-west-1",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
        StorageLocationS3(
            name="my-s3-us-west-2",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
    ]
)

root.external_volumes.create(my_external_volume)
Copy

Obtenção de detalhes de volume externo

É possível obter informações sobre um volume externo chamando o método ExternalVolumeResource.fetch, que retorna um objeto ExternalVolume.

O código no exemplo a seguir obtém informações sobre um volume externo nomeado my_external_volume:

my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Copy

Listagem de volumes externos

É possível listar volumes externos usando o método ExternalVolumeCollection.iter, que retorna um iterador PagedIter de objetos ExternalVolume.

O código no exemplo a seguir lista volumes externos cujos nomes começam com my e imprime o nome de cada um:

external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
  print(external_volume_obj.name)
Copy

Execução de operações de volume externo

É possível executar operações comuns de volume externo – como descartar e restaurar um volume externo – com um objeto ExternalVolumeResource.

Para demonstrar as operações que você pode realizar com um recurso de volume externo, o código no exemplo a seguir faz o seguinte:

  1. Busca o objeto de recurso de volume externo my_external_volume.

  2. Descarta o volume externo.

  3. Restaura o volume externo.

my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()
Copy