Python을 사용하여 Snowflake 스트림 관리하기¶
Python을 사용하여 삽입, 업데이트, 삭제 등 테이블에 대한 데이터 조작 언어(DML) 변경 사항과 각 변경 사항에 대한 메타데이터를 기록하는 오브젝트인 Snowflake 스트림을 관리할 수 있습니다. 자세한 내용은 스트림 소개 섹션을 참조하십시오.
참고
ALTER STREAM 은 현재 지원되지 않습니다.
Snowflake Python APIs 은 다음 두 가지 유형의 스트림을 나타냅니다.
Stream
: 스트림의 이름, 목표 지연, 웨어하우스, 쿼리 문과 같은 속성을 노출합니다.StreamResource
: 해당Stream
오브젝트 가져오기, 스트림 일시 중단 및 재개, 스트림 삭제에 사용할 수 있는 메서드를 노출합니다.
전제 조건¶
이 항목의 예제에서는 Snowflake와 연결하고 Snowflake Python APIs 을 사용할 수 있는 Root
오브젝트를 생성하는 코드를 추가했다고 가정합니다.
예를 들어, 다음 코드는 구성 파일에 정의된 연결 매개 변수를 사용하여 Snowflake에 대한 연결을 생성합니다.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
해당 코드에서는 결과 Session
오브젝트를 사용하여 API의 유형과 메서드를 사용하기 위해 Root
오브젝트를 생성합니다. 자세한 내용은 Snowflake Python APIs 을 사용하여 Snowflake에 연결 섹션을 참조하십시오.
스트림 만들기¶
스트림을 생성하려면 먼저 Stream
오브젝트를 생성한 다음 API Root
오브젝트에서 StreamCollection
오브젝트를 생성합니다. StreamCollection.create
를 사용하여 Snowflake에 새 스트림을 추가합니다.
다음 오브젝트 유형에서 스트림을 생성할 수 있습니다.
표준 테이블
뷰
디렉터리 테이블
소스 테이블에서¶
다음 예제의 코드는 my_db
데이터베이스의 소스 테이블 my_table
및 my_schema
스키마에 지정된 스트림 속성을 가진 이름이 my_stream_on_table
인 스트림을 나타내는 Stream
오브젝트를 생성합니다.
참고
StreamSourceTable
유형은 표준 테이블만 지원합니다. 동적 테이블, 이벤트 테이블, 외부 테이블, Iceberg 테이블 등 다른 유형의 테이블은 현재 지원되지 않습니다.
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable
stream_on_table = Stream(
"my_stream_on_table",
StreamSourceTable(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_table',
append_only = True,
show_initial_rows = False,
),
comment = 'create stream on table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
이 코드는 StreamCollection
변수 streams
를 생성하고 StreamCollection.create
를 사용하여 Snowflake에 새 스트림을 생성합니다.
소스 뷰에서¶
다음 예제의 코드는 my_db
데이터베이스의 소스 뷰 my_view
및 my_schema
스키마에 지정된 스트림 속성을 가진 이름이 my_stream_on_view
인 스트림을 나타내는 Stream
오브젝트를 생성합니다.
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView
stream_on_view = Stream(
"my_stream_on_view",
StreamSourceView(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_view',
),
comment = 'create stream on view'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
소스 디렉터리 테이블에서¶
다음 예제의 코드는 my_db
데이터베이스의 소스 디렉터리 테이블 my_directory_table
및 my_schema
스키마에 지정된 스트림 속성을 가진 이름이 my_stream_on_directory_table
인 스트림을 나타내는 Stream
오브젝트를 생성합니다.
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage
stream_on_directory_table = Stream(
"my_stream_on_directory_table",
StreamSourceStage(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_directory_table',
),
comment = 'create stream on directory table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
스트림 복제하기¶
다음 예제의 코드는 my_db
데이터베이스 및 my_schema
스키마에서 소스 스트림 my_other_stream
과 정의가 동일한 이름이 my_stream
인 새 스트림을 생성합니다.
from snowflake.core.stream import Stream
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
스트림 세부 정보 가져오기¶
Stream
오브젝트를 반환하는 StreamResource.fetch
메서드를 호출하여 스트림에 대한 정보를 얻을 수 있습니다.
다음 예제의 코드는 my_db
데이터베이스와 my_schema
스키마에서 이름이 my_stream
인 스트림에 대한 정보를 가져옵니다.
stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
스트림 나열하기¶
Stream
오브젝트의 PagedIter
반복기를 반환하는 StreamCollection.iter
메서드를 사용하여 스트림을 나열할 수 있습니다.
다음 예제의 코드는 my_db
데이터베이스와 my_schema
스키마에서 이름이 my
로 시작하는 스트림을 나열한 다음 각각의 이름을 출력합니다.
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
print(stream_obj.name)
다음 예제의 코드에도 이름이 my
로 시작하는 스트림이 목록에 표시되지만, like
대신 starts_with
매개 변수를 사용합니다. 이 예제에서는 선택적 매개 변수 show_limit=10
를 설정하여 결과 수를 10
로 제한합니다.
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
print(stream_obj.name)
스트림 삭제하기¶
StreamResource
오브젝트가 포함된 스트림을 삭제할 수 있습니다.
다음 예제의 코드는 my_stream
스트림 리소스 오브젝트를 가져온 다음 스트림을 삭제합니다.
my_stream_res = root.streams["my_stream"]
my_stream_res.drop()