Gerenciamento de fluxos Snowflake com Python

É possível usar Python para gerenciar fluxos Snowflake, que são objetos que registram alterações de linguagem de manipulação de dados (DML) feitas em tabelas, incluindo inserções, atualizações e exclusões, bem como metadados sobre cada alteração. Para obter mais informações, consulte Introdução a fluxos.

Nota

ALTER STREAM não é compatível atualmente.

O Snowflake Python APIs representa fluxos com dois tipos separados:

  • Stream: Expõe as propriedades de um fluxo, como nome, atraso de destino, warehouse e instrução de consulta.

  • StreamResource: Expõe métodos que podem ser usados para buscar um objeto Stream correspondente, suspender e retomar o fluxo, bem como descartar o fluxo.

Pré-requisitos

Os exemplos neste tópico pressupõem que você tenha adicionado código para se conectar ao Snowflake e criar um objeto Root a partir do qual usar o Snowflake Python APIs.

Por exemplo, o seguinte código usa parâmetros de conexão definidos em um arquivo de configuração para criar uma conexão com o Snowflake:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

Usando o objeto Session resultante, o código cria um objeto Root para usar os tipos e métodos de API. Para obter mais informações, consulte Conexão ao Snowflake com o Snowflake Python APIs.

Criação de um fluxo

Para criar um fluxo, primeiro crie um objeto Stream e, em seguida, crie um objeto StreamCollection a partir do objeto Root da API. Usando StreamCollection.create, adicione o novo fluxo ao Snowflake.

É possível criar um fluxo nos seguintes tipos de objeto:

  • Tabelas padrão

  • Exibições

  • Tabelas de diretório

Em uma tabela de origem

O código no exemplo a seguir cria um objeto Stream que representa um fluxo nomeado my_stream_on_table na tabela de origem my_table no banco de dados my_db e no esquema my_schema, com as propriedades de fluxo especificadas:

Nota

O tipo StreamSourceTable oferece suporte apenas a tabelas padrão. Outros tipos de tabelas – como tabelas dinâmicas, tabelas de evento, tabelas externas e tabelas Iceberg – não são compatíveis no momento.

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable

stream_on_table = Stream(
  "my_stream_on_table",
  StreamSourceTable(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_table',
      append_only = True,
      show_initial_rows = False,
  ),
  comment = 'create stream on table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
Copy

O código cria uma variável StreamCollection chamada streams e usa StreamCollection.create para criar um novo fluxo no Snowflake.

Em uma exibição de origem

O código no exemplo a seguir cria um objeto Stream que representa um fluxo nomeado my_stream_on_view na exibição de origem my_view no banco de dados my_db e no esquema my_schema, com as propriedades de fluxo especificadas:

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView

stream_on_view = Stream(
  "my_stream_on_view",
  StreamSourceView(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_view',
  ),
  comment = 'create stream on view'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
Copy

Em uma tabela de diretório de origem

O código no exemplo a seguir cria um objeto Stream que representa um fluxo nomeado my_stream_on_directory_table na tabela de diretório de origem my_directory_table no banco de dados my_db e no esquema my_schema, com as propriedades de fluxo especificadas:

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage

stream_on_directory_table = Stream(
  "my_stream_on_directory_table",
  StreamSourceStage(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_directory_table',
  ),
  comment = 'create stream on directory table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Copy

Clonagem de um fluxo

O código no exemplo a seguir cria um novo fluxo nomeado my_stream com a mesma definição do fluxo de origem my_other_stream no banco de dados my_db e no esquema my_schema:

from snowflake.core.stream import Stream

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Copy

Como obter detalhes do fluxo

É possível obter informações sobre um fluxo chamando o método StreamResource.fetch, que retorna um objeto Stream.

O código no exemplo a seguir obtém informações sobre um fluxo nomeado my_stream no banco de dados my_db e no esquema my_schema:

stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Copy

Listagem de fluxos

É possível listar fluxos usando o método StreamCollection.iter, que retorna um iterador PagedIter de objetos Stream.

O código no exemplo a seguir lista os fluxos cujos nomes começam com my no banco de dados my_db e no esquema my_schema e, em seguida, imprime o nome de cada um:

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

O código no exemplo a seguir também lista fluxos cujos nomes começam com my, mas usa o parâmetro starts_with em vez de like. Este exemplo também define o parâmetro opcional show_limit=10 para limitar o número de resultados para 10:

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

Descarte de um fluxo

É possível descartar um fluxo com um objeto StreamResource.

O código no exemplo a seguir busca o objeto de recurso de fluxo my_stream e então descarta o fluxo.

my_stream_res = root.streams["my_stream"]
my_stream_res.drop()
Copy