자습서: Cortex Analyst를 사용한 시계열 수익 데이터에 대한 질문에 답하기¶
소개¶
Cortex Analyst 는 데이터에 대한 자연어 질문을 SQL 쿼리를 생성하고 실행하여 결과로 변환합니다. 이 자습서에서는 시계열 매출 데이터 집합에 대한 질문에 응답하도록 Cortex Analyst 를 설정하는 방법을 설명합니다.
알아볼 내용¶
데이터 집합에 대한 의미 체계 모델을 설정합니다.
Cortex Analyst 를 쿼리하는 Streamlit 앱을 생성합니다.
전제 조건¶
이 자습서를 완료하려면 다음과 같은 필수 조건이 필요합니다.
데이터베이스, 스키마, 테이블, 스테이지 및 가상 웨어하우스 오브젝트를 생성하는 데 필요한 권한을 부여하는 역할이 있는 Snowflake 계정과 사용자가 있습니다.
로컬 시스템에 Streamlit 이 설정되어 있습니다.
이러한 요구 사항을 충족하기 위한 지침은 20분만에 Snowflake 시작하기 섹션을 참조하십시오.
1단계: 설정¶
샘플 데이터 가져오기¶
Google Drive 에서 다운로드한 샘플 데이터 세트와 해당 의미 체계 모델을 사용합니다. 다음 데이터 파일을 시스템에 다운로드합니다.
daily_revenue_combined.csv
daily_revenue_by_region_combined.csv
daily_revenue_by_product_combined.csv
의미 체계 모델도 다운로드합니다.
revenue_timeseries.yaml
잠시 시간을 내어 데이터의 의미 체계가 포함된 YAML 파일을 살펴보는 것이 좋습니다. 의미 체계 모델은 Cortex Analyst 가 데이터에 대한 질문을 이해하는 데 도움이 되는 추가 정보로 각 테이블의 SQL 스키마를 보완합니다. 자세한 내용은 Cortex Analyst 의미 체계 모델 사양 섹션을 참조하십시오.
참고
자습서이 아닌 환경이라면 Snowflake 테이블에 이미 저장되어 있는 데이터를 가져와서 자체 의미 체계 모델을 개발할 수 있습니다.
Snowflake 오브젝트 만들기¶
Snowflake UI인 Snowsight 를 사용하여 이 자습서에 필요한 Snowflake 오브젝트를 생성합니다. 자습서를 마친 후 이러한 오브젝트를 삭제할 수 있습니다.
참고
데이터베이스, 스키마, 웨어하우스, 스테이지, 테이블을 생성할 수 있는 역할을 사용합니다.
오브젝트를 생성하려면:
Snowsight 사용자 인터페이스의 왼쪽 탐색 모음에서 Worksheets 를 선택한 다음 + 버튼을 선택합니다. 새로운 SQL 워크시트가 표시됩니다.
아래 SQL 코드를 워크시트에 붙여넣은 다음 워크시트 오른쪽 상단의 드롭다운 메뉴에서 Run All 을 선택합니다.
CREATE OR REPLACE DATABASE cortex_analyst_demo;
CREATE OR REPLACE SCHEMA revenue_timeseries;
CREATE OR REPLACE WAREHOUSE cortex_analyst_wh
WAREHOUSE_SIZE = 'large'
WAREHOUSE_TYPE = 'standard'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE
COMMENT = 'warehouse for cortex analyst demo';
USE WAREHOUSE cortex_analyst_wh;
CREATE STAGE raw_data DIRECTORY = (ENABLE = TRUE);
CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE (
DATE DATE,
REVENUE FLOAT,
COGS FLOAT,
FORECASTED_REVENUE FLOAT
);
CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT (
DATE DATE,
PRODUCT_LINE VARCHAR(16777216),
REVENUE FLOAT,
COGS FLOAT,
FORECASTED_REVENUE FLOAT
);
CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION (
DATE DATE,
SALES_REGION VARCHAR(16777216),
REVENUE FLOAT,
COGS FLOAT,
FORECASTED_REVENUE FLOAT
);
위의 SQL은 다음과 같은 오브젝트를 생성합니다.
cortex_analyst_demo
데이터베이스revenue_timeseries
를 호출한 해당 데이터베이스 내의 스키마.해당 스키마의 3개 테이블(DAILY_REVENUE, DAILY_REVENUE_BY_PRODUCT 및 DAILY_REVENUE_BY_REGION).
이 테이블에 로드할 원시 데이터를 저장할
raw_data
스테이지.cortex_analyst_wh
가상 웨어하우스
가상 웨어하우스는 처음에는 일시 중단되어 있지만, 필요할 때 자동으로 시작됩니다.
2단계: Snowflake에 데이터 로드¶
CSV 파일의 데이터를 Snowflake로 가져오려면 스테이지에 업로드한 다음 스테이지의 데이터를 테이블에 로드합니다. 동시에 이후 단계에서 사용할 수 있도록 의미 체계 모델 YAML 파일을 업로드합니다.
업로드할 파일은 다음과 같습니다.
daily_revenue_combined.csv
daily_revenue_by_region_combined.csv
daily_revenue_by_product_combined.csv
revenue_timeseries.yaml
Snowsight 의 파일을 업로드하려면:
Snowsight UI에서 왼쪽 탐색 모음의 Data icon 을 선택한 다음 Add Data 를 선택합니다. Add Data 페이지에서 Load files into a stage 를 선택합니다.
이전 단계에서 다운로드한 4개의 파일을 Snowsight 창으로 끌어서 놓습니다.
cortex_analyst_demo
데이터베이스 및raw_data
스테이지를 선택한 다음 Upload 버튼을 선택하여 파일을 업로드합니다.
파일을 업로드했으니, 이제 Snowsight 워크시트에서 아래 SQL 명령을 실행하여 CSV 파일의 데이터를 로드합니다.
COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE
FROM @raw_data
FILES = ('daily_revenue_combined.csv')
FILE_FORMAT = (
TYPE=CSV,
SKIP_HEADER=1,
FIELD_DELIMITER=',',
TRIM_SPACE=FALSE,
FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
REPLACE_INVALID_CHARACTERS=TRUE,
DATE_FORMAT=AUTO,
TIME_FORMAT=AUTO,
TIMESTAMP_FORMAT=AUTO
EMPTY_FIELD_AS_NULL = FALSE
error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT
FROM @raw_data
FILES = ('daily_revenue_by_product_combined.csv')
FILE_FORMAT = (
TYPE=CSV,
SKIP_HEADER=1,
FIELD_DELIMITER=',',
TRIM_SPACE=FALSE,
FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
REPLACE_INVALID_CHARACTERS=TRUE,
DATE_FORMAT=AUTO,
TIME_FORMAT=AUTO,
TIMESTAMP_FORMAT=AUTO
EMPTY_FIELD_AS_NULL = FALSE
error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION
FROM @raw_data
FILES = ('daily_revenue_by_region_combined.csv')
FILE_FORMAT = (
TYPE=CSV,
SKIP_HEADER=1,
FIELD_DELIMITER=',',
TRIM_SPACE=FALSE,
FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
REPLACE_INVALID_CHARACTERS=TRUE,
DATE_FORMAT=AUTO,
TIME_FORMAT=AUTO,
TIMESTAMP_FORMAT=AUTO
EMPTY_FIELD_AS_NULL = FALSE
error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
3단계: Cortex Analyst 를 통해 데이터와 통신할 Streamlit 앱 만들기¶
Cortex Analyst 를 사용하는 Streamlit 앱을 생성하려면 다음을 수행합니다.
로컬에
analyst_demo.py
라는 Python 파일을 생성합니다.아래 코드를 파일에 복사합니다.
자리 표시자 값을 사용자의 계정 세부 정보로 바꿉니다.
streamlit run analyst_demo.py
를 사용하여 Streamlit 앱을 실행합니다.
from typing import Any, Dict, List, Optional
import pandas as pd
import requests
import snowflake.connector
import streamlit as st
DATABASE = "CORTEX_ANALYST_DEMO"
SCHEMA = "REVENUE_TIMESERIES"
STAGE = "RAW_DATA"
FILE = "revenue_timeseries.yaml"
WAREHOUSE = "cortex_analyst_wh"
# replace values below with your Snowflake connection information
HOST = "<host>"
ACCOUNT = "<account>"
USER = "<user>"
PASSWORD = "<password>"
ROLE = "<role>"
if 'CONN' not in st.session_state or st.session_state.CONN is None:
st.session_state.CONN = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
host=HOST,
port=443,
warehouse=WAREHOUSE,
role=ROLE,
)
def send_message(prompt: str) -> Dict[str, Any]:
"""Calls the REST API and returns the response."""
request_body = {
"messages": [{"role": "user", "content": [{"type": "text", "text": prompt}]}],
"semantic_model_file": f"@{DATABASE}.{SCHEMA}.{STAGE}/{FILE}",
}
resp = requests.post(
url=f"https://{HOST}/api/v2/cortex/analyst/message",
json=request_body,
headers={
"Authorization": f'Snowflake Token="{st.session_state.CONN.rest.token}"',
"Content-Type": "application/json",
},
)
request_id = resp.headers.get("X-Snowflake-Request-Id")
if resp.status_code < 400:
return {**resp.json(), "request_id": request_id} # type: ignore[arg-type]
else:
raise Exception(
f"Failed request (id: {request_id}) with status {resp.status_code}: {resp.text}"
)
def process_message(prompt: str) -> None:
"""Processes a message and adds the response to the chat."""
st.session_state.messages.append(
{"role": "user", "content": [{"type": "text", "text": prompt}]}
)
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
with st.spinner("Generating response..."):
response = send_message(prompt=prompt)
request_id = response["request_id"]
content = response["message"]["content"]
display_content(content=content, request_id=request_id) # type: ignore[arg-type]
st.session_state.messages.append(
{"role": "assistant", "content": content, "request_id": request_id}
)
def display_content(
content: List[Dict[str, str]],
request_id: Optional[str] = None,
message_index: Optional[int] = None,
) -> None:
"""Displays a content item for a message."""
message_index = message_index or len(st.session_state.messages)
if request_id:
with st.expander("Request ID", expanded=False):
st.markdown(request_id)
for item in content:
if item["type"] == "text":
st.markdown(item["text"])
elif item["type"] == "suggestions":
with st.expander("Suggestions", expanded=True):
for suggestion_index, suggestion in enumerate(item["suggestions"]):
if st.button(suggestion, key=f"{message_index}_{suggestion_index}"):
st.session_state.active_suggestion = suggestion
elif item["type"] == "sql":
with st.expander("SQL Query", expanded=False):
st.code(item["statement"], language="sql")
with st.expander("Results", expanded=True):
with st.spinner("Running SQL..."):
df = pd.read_sql(item["statement"], st.session_state.CONN)
if len(df.index) > 1:
data_tab, line_tab, bar_tab = st.tabs(
["Data", "Line Chart", "Bar Chart"]
)
data_tab.dataframe(df)
if len(df.columns) > 1:
df = df.set_index(df.columns[0])
with line_tab:
st.line_chart(df)
with bar_tab:
st.bar_chart(df)
else:
st.dataframe(df)
st.title("Cortex Analyst")
st.markdown(f"Semantic Model: `{FILE}`")
if "messages" not in st.session_state:
st.session_state.messages = []
st.session_state.suggestions = []
st.session_state.active_suggestion = None
for message_index, message in enumerate(st.session_state.messages):
with st.chat_message(message["role"]):
display_content(
content=message["content"],
request_id=message.get("request_id"),
message_index=message_index,
)
if user_input := st.chat_input("What is your question?"):
process_message(prompt=user_input)
if st.session_state.active_suggestion:
process_message(prompt=st.session_state.active_suggestion)
st.session_state.active_suggestion = None
앱을 실행하면 질문을 입력하라는 메시지가 표시됩니다. “What questions can I ask?”로 시작하여 몇 가지 제안을 시도해 보십시오.
4단계: 정리¶
정리(선택 사항)¶
자습서를 시작하기 전의 상태로 시스템을 되돌리려면 다음 DROP <오브젝트> 명령을 실행합니다.
DROP DATABASE IF EXISTS cortex_analyst_demo;
DROP WAREHOUSE IF EXISTS cortex_analyst_wh;
데이터베이스를 삭제하면 테이블과 같은 모든 하위 데이터베이스 오브젝트가 자동으로 제거됩니다.
다음 단계¶
축하합니다! Snowflake에서 “데이터와 대화”하는 간단한 Cortex Analyst 앱을 성공적으로 구축했습니다.
추가 리소스¶
다음 리소스를 활용하여 계속 알아보십시오.