Python을 사용하여 Snowflake 동적 테이블 관리하기

Python을 사용하면 지속적인 처리 파이프라인을 위한 새로운 테이블 유형인 Snowflake 동적 테이블을 관리할 수 있습니다. 동적 테이블은 지정된 쿼리의 결과를 구체화합니다. 이 기능에 대한 개요는 동적 테이블 섹션을 참조하십시오.

Snowflake Python APIs 은 다음 두 가지 별개 유형의 동적 테이블을 나타냅니다.

  • DynamicTable: 동적 테이블의 속성(이름, 대상 지연, 웨어하우스, 쿼리 문 등)을 표시합니다.

  • DynamicTableResource: 해당 DynamicTable 오브젝트 가져오기, 동적 테이블 일시 중단 및 재개, 동적 테이블 삭제에 사용할 수 있는 메서드를 노출합니다.

전제 조건

이 항목의 예제에서는 Snowflake와 연결하고 Snowflake Python APIs 을 사용할 수 있는 Root 오브젝트를 생성하는 코드를 추가했다고 가정합니다.

예를 들어, 다음 코드는 구성 파일에 정의된 연결 매개 변수를 사용하여 Snowflake에 대한 연결을 생성합니다.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

해당 코드에서는 결과 Session 오브젝트를 사용하여 API의 유형과 메서드를 사용하기 위해 Root 오브젝트를 생성합니다. 자세한 내용은 Snowflake Python APIs 을 사용하여 Snowflake에 연결 섹션을 참조하십시오.

동적 테이블 만들기

동적 테이블을 생성하려면 먼저 DynamicTable 오브젝트를 생성한 다음 API Root 오브젝트에서 DynamicTableCollection 오브젝트를 생성합니다. DynamicTableCollection.create 를 사용하여 Snowflake에 새 동적 테이블을 추가합니다.

다음 예제의 코드는 최소한의 필수 옵션을 지정하여 my_db 데이터베이스 및 my_schema 스키마에서 my_dynamic_table 이라는 동적 테이블을 나타내는 DynamicTable 오브젝트를 생성합니다.

from snowflake.core.dynamic_table import DynamicTable, DownstreamLag

my_dt = DynamicTable(
  name='my_dynamic_table',
  target_lag=DownstreamLag(),
  warehouse='my_wh',
  query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
Copy

이 코드는 DynamicTableCollection 변수 dynamic_tables 를 생성하고 DynamicTableCollection.create 를 사용하여 Snowflake에 새 동적 테이블을 생성합니다.

다음 예제의 코드는 현재 지원되는 모든 옵션을 지정하여 my_db 데이터베이스 및 my_schema 스키마에서 my_dynamic_table2 이라는 동적 테이블을 나타내는 DynamicTable 오브젝트를 생성합니다.

from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTable(
      name='my_dynamic_table2',
      kind='PERMANENT',
      target_lag=UserDefinedLag(seconds=60),
      warehouse='my_wh',
      query='SELECT * FROM t',
      refresh_mode='FULL',
      initialize='ON_SCHEDULE',
      cluster_by=['id > 1'],
      comment='test table',
      data_retention_time_in_days=7,
      max_data_extension_time_in_days=7,
  )
)
Copy

동적 테이블 복제하기

다음 예제의 코드는 my_db 데이터베이스의 소스 동적 테이블 my_dynamic_tablemy_schema 스키마에서 동일한 열 정의와 모든 기존 데이터를 사용하여 my_dynamic_table2 이라는 이름의 새 동적 테이블을 생성합니다.

참고

이 복제 작업은 선택적 target_lagwarehouse 매개 변수를 포함하는 DynamicTableClone 오브젝트를 사용하며, 현재 다른 매개 변수는 지원하지 않습니다.

from snowflake.core.dynamic_table import DynamicTableClone

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTableClone(
      name='my_dynamic_table2',
      warehouse='my_wh2',
  ),
  clone_table='my_dynamic_table',
)
Copy

이 기능에 대한 자세한 내용은 CREATE DYNAMIC TABLE … CLONE 섹션을 참조하십시오.

동적 테이블 세부 정보 얻기

DynamicTable 오브젝트를 반환하는 DynamicTableResource.fetch 메서드를 호출하여 동적 테이블에 대한 정보를 얻을 수 있습니다.

다음 예제의 코드는 my_db 데이터베이스와 my_schema 스키마에서 이름이 my_dynamic_table 인 동적 테이블에 대한 정보를 가져옵니다.

dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
Copy

동적 테이블 나열하기

DynamicTable 오브젝트의 PagedIter 반복기를 반환하는 DynamicTableCollection.iter 메서드를 사용하여 동적 테이블을 나열할 수 있습니다.

다음 예제의 코드는 my_db 데이터베이스와 my_schema 스키마에서 이름이 my 텍스트로 시작하는 동적 테이블을 나열한 다음 각 테이블의 이름을 출력합니다.

from snowflake.core.dynamic_table import DynamicTableCollection

dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
  print(dt_obj.name)
Copy

동적 테이블 작업 수행

동적 테이블 새로 고침, 일시 중단 및 재개와 같은 일반적인 동적 테이블 작업을 DynamicTableResource 오브젝트를 사용하여 수행할 수 있습니다.

동적 테이블 리소스로 할 수 있는 일부 작업을 보여주기 위해 다음 예제의 코드는 다음을 수행합니다.

  1. my_dynamic_table 동적 테이블 리소스 오브젝트를 가져옵니다.

  2. 동적 테이블을 새로 고칩니다.

  3. 동적 테이블을 일시 중단합니다.

  4. 동적 테이블을 재개합니다.

  5. 동적 테이블을 삭제합니다.

my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]

my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()
Copy