Gerenciamento de fluxos Snowflake com Python¶
É possível usar Python para gerenciar fluxos Snowflake, que são objetos que registram alterações de linguagem de manipulação de dados (DML) feitas em tabelas, incluindo inserções, atualizações e exclusões, bem como metadados sobre cada alteração. Para obter mais informações, consulte Introdução a fluxos.
Nota
ALTER STREAM não é compatível atualmente.
O Snowflake Python APIs representa fluxos com dois tipos separados:
Stream: Expõe as propriedades de um fluxo, como nome, atraso de destino, warehouse e instrução de consulta.StreamResource: Expõe métodos que podem ser usados para buscar um objetoStreamcorrespondente, suspender e retomar o fluxo, bem como descartar o fluxo.
Pré-requisitos¶
Os exemplos neste tópico pressupõem que você tenha adicionado código para se conectar ao Snowflake e criar um objeto Root a partir do qual usar o Snowflake Python APIs.
Por exemplo, o seguinte código usa parâmetros de conexão definidos em um arquivo de configuração para criar uma conexão com o Snowflake:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Usando o objeto Session resultante, o código cria um objeto Root para usar os tipos e métodos de API. Para obter mais informações, consulte Conexão ao Snowflake com o Snowflake Python APIs.
Criação de um fluxo¶
Para criar um fluxo, primeiro crie um objeto Stream e, em seguida, crie um objeto StreamCollection a partir do objeto Root da API. Usando StreamCollection.create, adicione o novo fluxo ao Snowflake.
É possível criar um fluxo nos seguintes tipos de objeto:
Tabelas padrão
Exibições
Tabelas de diretório
Em uma tabela de origem¶
O código no exemplo a seguir cria um objeto Stream que representa um fluxo nomeado my_stream_on_table na tabela de origem my_table no banco de dados my_db e no esquema my_schema, com as propriedades de fluxo especificadas:
Nota
O tipo StreamSourceTable oferece suporte apenas a tabelas padrão. Outros tipos de tabelas – como tabelas dinâmicas, tabelas de evento, tabelas externas e tabelas Iceberg – não são compatíveis no momento.
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable
stream_on_table = Stream(
"my_stream_on_table",
StreamSourceTable(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_table',
append_only = True,
show_initial_rows = False,
),
comment = 'create stream on table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
O código cria uma variável StreamCollection chamada streams e usa StreamCollection.create para criar um novo fluxo no Snowflake.
Em uma exibição de origem¶
O código no exemplo a seguir cria um objeto Stream que representa um fluxo nomeado my_stream_on_view na exibição de origem my_view no banco de dados my_db e no esquema my_schema, com as propriedades de fluxo especificadas:
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView
stream_on_view = Stream(
"my_stream_on_view",
StreamSourceView(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_view',
),
comment = 'create stream on view'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
Em uma tabela de diretório de origem¶
O código no exemplo a seguir cria um objeto Stream que representa um fluxo nomeado my_stream_on_directory_table na tabela de diretório de origem my_directory_table no banco de dados my_db e no esquema my_schema, com as propriedades de fluxo especificadas:
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage
stream_on_directory_table = Stream(
"my_stream_on_directory_table",
StreamSourceStage(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_directory_table',
),
comment = 'create stream on directory table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Clonagem de um fluxo¶
O código no exemplo a seguir cria um novo fluxo nomeado my_stream com a mesma definição do fluxo de origem my_other_stream no banco de dados my_db e no esquema my_schema:
from snowflake.core.stream import Stream
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Como obter detalhes do fluxo¶
É possível obter informações sobre um fluxo chamando o método StreamResource.fetch, que retorna um objeto Stream.
O código no exemplo a seguir obtém informações sobre um fluxo nomeado my_stream no banco de dados my_db e no esquema my_schema:
stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Listagem de fluxos¶
É possível listar fluxos usando o método StreamCollection.iter, que retorna um iterador PagedIter de objetos Stream.
O código no exemplo a seguir lista os fluxos cujos nomes começam com my no banco de dados my_db e no esquema my_schema e, em seguida, imprime o nome de cada um:
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
print(stream_obj.name)
O código no exemplo a seguir também lista fluxos cujos nomes começam com my, mas usa o parâmetro starts_with em vez de like. Este exemplo também define o parâmetro opcional show_limit=10 para limitar o número de resultados para 10:
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
print(stream_obj.name)
Descarte de um fluxo¶
É possível descartar um fluxo com um objeto StreamResource.
O código no exemplo a seguir obtém o objeto de recurso de fluxo my_stream e, em seguida, descarta o fluxo.
my_stream_res = root.streams["my_stream"]
my_stream_res.drop()