Python을 사용하여 Snowflake 스트림 관리하기

Python을 사용하여 삽입, 업데이트, 삭제 등 테이블에 대한 데이터 조작 언어(DML) 변경 사항과 각 변경 사항에 대한 메타데이터를 기록하는 오브젝트인 Snowflake 스트림을 관리할 수 있습니다. 자세한 내용은 스트림 소개 섹션을 참조하십시오.

참고

ALTER STREAM 은 현재 지원되지 않습니다.

Snowflake Python APIs 은 다음 두 가지 유형의 스트림을 나타냅니다.

  • Stream: 스트림의 이름, 목표 지연, 웨어하우스, 쿼리 문과 같은 속성을 노출합니다.

  • StreamResource: 해당 Stream 오브젝트 가져오기, 스트림 일시 중단 및 재개, 스트림 삭제에 사용할 수 있는 메서드를 노출합니다.

전제 조건

이 항목의 예제에서는 Snowflake와 연결하고 Snowflake Python APIs 을 사용할 수 있는 Root 오브젝트를 생성하는 코드를 추가했다고 가정합니다.

예를 들어, 다음 코드는 구성 파일에 정의된 연결 매개 변수를 사용하여 Snowflake에 대한 연결을 생성합니다.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

해당 코드에서는 결과 Session 오브젝트를 사용하여 API의 유형과 메서드를 사용하기 위해 Root 오브젝트를 생성합니다. 자세한 내용은 Snowflake Python APIs 을 사용하여 Snowflake에 연결 섹션을 참조하십시오.

스트림 만들기

스트림을 생성하려면 먼저 Stream 오브젝트를 생성한 다음 API Root 오브젝트에서 StreamCollection 오브젝트를 생성합니다. StreamCollection.create 를 사용하여 Snowflake에 새 스트림을 추가합니다.

다음 오브젝트 유형에서 스트림을 생성할 수 있습니다.

  • 표준 테이블

  • 디렉터리 테이블

소스 테이블에서

다음 예제의 코드는 my_db 데이터베이스의 소스 테이블 my_tablemy_schema 스키마에 지정된 스트림 속성을 가진 이름이 my_stream_on_table 인 스트림을 나타내는 Stream 오브젝트를 생성합니다.

참고

StreamSourceTable 유형은 표준 테이블만 지원합니다. 동적 테이블, 이벤트 테이블, 외부 테이블, Iceberg 테이블 등 다른 유형의 테이블은 현재 지원되지 않습니다.

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable

stream_on_table = Stream(
  "my_stream_on_table",
  StreamSourceTable(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_table',
      append_only = True,
      show_initial_rows = False,
  ),
  comment = 'create stream on table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
Copy

이 코드는 StreamCollection 변수 streams 를 생성하고 StreamCollection.create 를 사용하여 Snowflake에 새 스트림을 생성합니다.

소스 뷰에서

다음 예제의 코드는 my_db 데이터베이스의 소스 뷰 my_viewmy_schema 스키마에 지정된 스트림 속성을 가진 이름이 my_stream_on_view 인 스트림을 나타내는 Stream 오브젝트를 생성합니다.

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView

stream_on_view = Stream(
  "my_stream_on_view",
  StreamSourceView(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_view',
  ),
  comment = 'create stream on view'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
Copy

소스 디렉터리 테이블에서

다음 예제의 코드는 my_db 데이터베이스의 소스 디렉터리 테이블 my_directory_tablemy_schema 스키마에 지정된 스트림 속성을 가진 이름이 my_stream_on_directory_table 인 스트림을 나타내는 Stream 오브젝트를 생성합니다.

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage

stream_on_directory_table = Stream(
  "my_stream_on_directory_table",
  StreamSourceStage(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_directory_table',
  ),
  comment = 'create stream on directory table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Copy

스트림 복제하기

다음 예제의 코드는 my_db 데이터베이스 및 my_schema 스키마에서 소스 스트림 my_other_stream 과 정의가 동일한 이름이 my_stream 인 새 스트림을 생성합니다.

from snowflake.core.stream import Stream

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Copy

스트림 세부 정보 가져오기

Stream 오브젝트를 반환하는 StreamResource.fetch 메서드를 호출하여 스트림에 대한 정보를 얻을 수 있습니다.

다음 예제의 코드는 my_db 데이터베이스와 my_schema 스키마에서 이름이 my_stream 인 스트림에 대한 정보를 가져옵니다.

stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Copy

스트림 나열하기

Stream 오브젝트의 PagedIter 반복기를 반환하는 StreamCollection.iter 메서드를 사용하여 스트림을 나열할 수 있습니다.

다음 예제의 코드는 my_db 데이터베이스와 my_schema 스키마에서 이름이 my 로 시작하는 스트림을 나열한 다음 각각의 이름을 출력합니다.

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

다음 예제의 코드에도 이름이 my 로 시작하는 스트림이 목록에 표시되지만, like 대신 starts_with 매개 변수를 사용합니다. 이 예제에서는 선택적 매개 변수 show_limit=10 를 설정하여 결과 수를 10 로 제한합니다.

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

스트림 삭제하기

StreamResource 오브젝트가 포함된 스트림을 삭제할 수 있습니다.

다음 예제의 코드는 my_stream 스트림 리소스 오브젝트를 가져온 다음 스트림을 삭제합니다.

my_stream_res = root.streams["my_stream"]
my_stream_res.drop()
Copy