Gerenciamento de fluxos Snowflake com Python¶
É possível usar Python para gerenciar fluxos Snowflake, que são objetos que registram alterações de linguagem de manipulação de dados (DML) feitas em tabelas, incluindo inserções, atualizações e exclusões, bem como metadados sobre cada alteração. Para obter mais informações, consulte Introdução a fluxos.
Nota
ALTER STREAM não é compatível atualmente.
O Snowflake Python APIs representa fluxos com dois tipos separados:
Stream
: Expõe as propriedades de um fluxo, como nome, atraso de destino, warehouse e instrução de consulta.StreamResource
: Expõe métodos que podem ser usados para buscar um objetoStream
correspondente, suspender e retomar o fluxo, bem como descartar o fluxo.
Pré-requisitos¶
Os exemplos neste tópico pressupõem que você tenha adicionado código para se conectar ao Snowflake e criar um objeto Root
a partir do qual usar o Snowflake Python APIs.
Por exemplo, o seguinte código usa parâmetros de conexão definidos em um arquivo de configuração para criar uma conexão com o Snowflake:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Usando o objeto Session
resultante, o código cria um objeto Root
para usar os tipos e métodos de API. Para obter mais informações, consulte Conexão ao Snowflake com o Snowflake Python APIs.
Criação de um fluxo¶
Para criar um fluxo, primeiro crie um objeto Stream
e, em seguida, crie um objeto StreamCollection
a partir do objeto Root
da API. Usando StreamCollection.create
, adicione o novo fluxo ao Snowflake.
É possível criar um fluxo nos seguintes tipos de objeto:
Tabelas padrão
Exibições
Tabelas de diretório
Em uma tabela de origem¶
O código no exemplo a seguir cria um objeto Stream
que representa um fluxo nomeado my_stream_on_table
na tabela de origem my_table
no banco de dados my_db
e no esquema my_schema
, com as propriedades de fluxo especificadas:
Nota
O tipo StreamSourceTable
oferece suporte apenas a tabelas padrão. Outros tipos de tabelas – como tabelas dinâmicas, tabelas de evento, tabelas externas e tabelas Iceberg – não são compatíveis no momento.
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable
stream_on_table = Stream(
"my_stream_on_table",
StreamSourceTable(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_table',
append_only = True,
show_initial_rows = False,
),
comment = 'create stream on table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
O código cria uma variável StreamCollection
chamada streams
e usa StreamCollection.create
para criar um novo fluxo no Snowflake.
Em uma exibição de origem¶
O código no exemplo a seguir cria um objeto Stream
que representa um fluxo nomeado my_stream_on_view
na exibição de origem my_view
no banco de dados my_db
e no esquema my_schema
, com as propriedades de fluxo especificadas:
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView
stream_on_view = Stream(
"my_stream_on_view",
StreamSourceView(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_view',
),
comment = 'create stream on view'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
Em uma tabela de diretório de origem¶
O código no exemplo a seguir cria um objeto Stream
que representa um fluxo nomeado my_stream_on_directory_table
na tabela de diretório de origem my_directory_table
no banco de dados my_db
e no esquema my_schema
, com as propriedades de fluxo especificadas:
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage
stream_on_directory_table = Stream(
"my_stream_on_directory_table",
StreamSourceStage(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_directory_table',
),
comment = 'create stream on directory table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Clonagem de um fluxo¶
O código no exemplo a seguir cria um novo fluxo nomeado my_stream
com a mesma definição do fluxo de origem my_other_stream
no banco de dados my_db
e no esquema my_schema
:
from snowflake.core.stream import Stream
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Como obter detalhes do fluxo¶
É possível obter informações sobre um fluxo chamando o método StreamResource.fetch
, que retorna um objeto Stream
.
O código no exemplo a seguir obtém informações sobre um fluxo nomeado my_stream
no banco de dados my_db
e no esquema my_schema
:
stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Listagem de fluxos¶
É possível listar fluxos usando o método StreamCollection.iter
, que retorna um iterador PagedIter
de objetos Stream
.
O código no exemplo a seguir lista os fluxos cujos nomes começam com my
no banco de dados my_db
e no esquema my_schema
e, em seguida, imprime o nome de cada um:
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
print(stream_obj.name)
O código no exemplo a seguir também lista fluxos cujos nomes começam com my
, mas usa o parâmetro starts_with
em vez de like
. Este exemplo também define o parâmetro opcional show_limit=10
para limitar o número de resultados para 10
:
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
print(stream_obj.name)
Descarte de um fluxo¶
É possível descartar um fluxo com um objeto StreamResource
.
O código no exemplo a seguir busca o objeto de recurso de fluxo my_stream
e então descarta o fluxo.
my_stream_res = root.streams["my_stream"]
my_stream_res.drop()