자습서: Cortex Analyst를 사용한 시계열 수익 데이터에 대한 질문에 답하기

소개

Cortex Analyst 는 데이터에 대한 자연어 질문을 SQL 쿼리를 생성하고 실행하여 결과로 변환합니다. 이 자습서에서는 시계열 매출 데이터 집합에 대한 질문에 응답하도록 Cortex Analyst 를 설정하는 방법을 설명합니다.

알아볼 내용

  • 데이터 집합에 대한 의미 체계 모델을 설정합니다.

  • Cortex Analyst 를 쿼리하는 Streamlit 앱을 생성합니다.

전제 조건

이 자습서를 완료하려면 다음과 같은 필수 조건이 필요합니다.

  • 데이터베이스, 스키마, 테이블, 스테이지 및 가상 웨어하우스 오브젝트를 생성하는 데 필요한 권한을 부여하는 역할이 있는 Snowflake 계정과 사용자가 있습니다.

  • 로컬 시스템에 Streamlit 이 설정되어 있습니다.

이러한 요구 사항을 충족하기 위한 지침은 20분만에 Snowflake 시작하기 섹션을 참조하십시오.

1단계: 설정

샘플 데이터 가져오기

Google Drive 에서 다운로드한 샘플 데이터 세트와 해당 의미 체계 모델을 사용합니다. 다음 데이터 파일을 시스템에 다운로드합니다.

  • daily_revenue_combined.csv

  • daily_revenue_by_region_combined.csv

  • daily_revenue_by_product_combined.csv

의미 체계 모델도 다운로드합니다.

  • revenue_timeseries.yaml

잠시 시간을 내어 데이터의 의미 체계가 포함된 YAML 파일을 살펴보는 것이 좋습니다. 의미 체계 모델은 Cortex Analyst 가 데이터에 대한 질문을 이해하는 데 도움이 되는 추가 정보로 각 테이블의 SQL 스키마를 보완합니다. 자세한 내용은 Cortex Analyst 의미 체계 모델 사양 섹션을 참조하십시오.

참고

자습서이 아닌 환경이라면 Snowflake 테이블에 이미 저장되어 있는 데이터를 가져와서 자체 의미 체계 모델을 개발할 수 있습니다.

Snowflake 오브젝트 만들기

Snowflake UI인 Snowsight 를 사용하여 이 자습서에 필요한 Snowflake 오브젝트를 생성합니다. 자습서를 마친 후 이러한 오브젝트를 삭제할 수 있습니다.

참고

데이터베이스, 스키마, 웨어하우스, 스테이지, 테이블을 생성할 수 있는 역할을 사용합니다.

오브젝트를 생성하려면:

  1. Snowsight 사용자 인터페이스의 왼쪽 탐색 모음에서 Worksheets 를 선택한 다음 + 버튼을 선택합니다. 새로운 SQL 워크시트가 표시됩니다.

  2. 아래 SQL 코드를 워크시트에 붙여넣은 다음 워크시트 오른쪽 상단의 드롭다운 메뉴에서 Run All 을 선택합니다.

CREATE OR REPLACE DATABASE cortex_analyst_demo;

CREATE OR REPLACE SCHEMA revenue_timeseries;

CREATE OR REPLACE WAREHOUSE cortex_analyst_wh
    WAREHOUSE_SIZE = 'large'
    WAREHOUSE_TYPE = 'standard'
    AUTO_SUSPEND = 60
    AUTO_RESUME = TRUE
    INITIALLY_SUSPENDED = TRUE
COMMENT = 'warehouse for cortex analyst demo';

USE WAREHOUSE cortex_analyst_wh;

CREATE STAGE raw_data DIRECTORY = (ENABLE = TRUE);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE (
    DATE DATE,
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT (
    DATE DATE,
    PRODUCT_LINE VARCHAR(16777216),
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION (
    DATE DATE,
    SALES_REGION VARCHAR(16777216),
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);
Copy

위의 SQL은 다음과 같은 오브젝트를 생성합니다.

  • cortex_analyst_demo 데이터베이스

  • revenue_timeseries 를 호출한 해당 데이터베이스 내의 스키마.

  • 해당 스키마의 3개 테이블(DAILY_REVENUE, DAILY_REVENUE_BY_PRODUCT 및 DAILY_REVENUE_BY_REGION).

  • 이 테이블에 로드할 원시 데이터를 저장할 raw_data 스테이지.

  • cortex_analyst_wh 가상 웨어하우스

가상 웨어하우스는 처음에는 일시 중단되어 있지만, 필요할 때 자동으로 시작됩니다.

2단계: Snowflake에 데이터 로드

CSV 파일의 데이터를 Snowflake로 가져오려면 스테이지에 업로드한 다음 스테이지의 데이터를 테이블에 로드합니다. 동시에 이후 단계에서 사용할 수 있도록 의미 체계 모델 YAML 파일을 업로드합니다.

업로드할 파일은 다음과 같습니다.

  • daily_revenue_combined.csv

  • daily_revenue_by_region_combined.csv

  • daily_revenue_by_product_combined.csv

  • revenue_timeseries.yaml

Snowsight 의 파일을 업로드하려면:

  1. Snowsight UI에서 왼쪽 탐색 모음의 Data icon 을 선택한 다음 Add Data 를 선택합니다. Add Data 페이지에서 Load files into a stage 를 선택합니다.

  2. 이전 단계에서 다운로드한 4개의 파일을 Snowsight 창으로 끌어서 놓습니다.

  3. cortex_analyst_demo 데이터베이스 및 raw_data 스테이지를 선택한 다음 Upload 버튼을 선택하여 파일을 업로드합니다.

파일을 업로드했으니, 이제 Snowsight 워크시트에서 아래 SQL 명령을 실행하여 CSV 파일의 데이터를 로드합니다.

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE
FROM @raw_data
FILES = ('daily_revenue_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT
FROM @raw_data
FILES = ('daily_revenue_by_product_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION
FROM @raw_data
FILES = ('daily_revenue_by_region_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
Copy

3단계: Cortex Analyst 를 통해 데이터와 통신할 Streamlit 앱 만들기

Cortex Analyst 를 사용하는 Streamlit 앱을 생성하려면 다음을 수행합니다.

  1. 로컬에 analyst_demo.py 라는 Python 파일을 생성합니다.

  2. 아래 코드를 파일에 복사합니다.

  3. 자리 표시자 값을 사용자의 계정 세부 정보로 바꿉니다.

  4. streamlit run analyst_demo.py 를 사용하여 Streamlit 앱을 실행합니다.

from typing import Any, Dict, List, Optional

import pandas as pd
import requests
import snowflake.connector
import streamlit as st

DATABASE = "CORTEX_ANALYST_DEMO"
SCHEMA = "REVENUE_TIMESERIES"
STAGE = "RAW_DATA"
FILE = "revenue_timeseries.yaml"
WAREHOUSE = "cortex_analyst_wh"

# replace values below with your Snowflake connection information
HOST = "<host>"
ACCOUNT = "<account>"
USER = "<user>"
PASSWORD = "<password>"
ROLE = "<role>"

if 'CONN' not in st.session_state or st.session_state.CONN is None:
    st.session_state.CONN = snowflake.connector.connect(
        user=USER,
        password=PASSWORD,
        account=ACCOUNT,
        host=HOST,
        port=443,
        warehouse=WAREHOUSE,
        role=ROLE,
    )

def send_message(prompt: str) -> Dict[str, Any]:
    """Calls the REST API and returns the response."""
    request_body = {
        "messages": [{"role": "user", "content": [{"type": "text", "text": prompt}]}],
        "semantic_model_file": f"@{DATABASE}.{SCHEMA}.{STAGE}/{FILE}",
    }
    resp = requests.post(
        url=f"https://{HOST}/api/v2/cortex/analyst/message",
        json=request_body,
        headers={
            "Authorization": f'Snowflake Token="{st.session_state.CONN.rest.token}"',
            "Content-Type": "application/json",
        },
    )
    request_id = resp.headers.get("X-Snowflake-Request-Id")
    if resp.status_code < 400:
        return {**resp.json(), "request_id": request_id}  # type: ignore[arg-type]
    else:
        raise Exception(
            f"Failed request (id: {request_id}) with status {resp.status_code}: {resp.text}"
        )

def process_message(prompt: str) -> None:
    """Processes a message and adds the response to the chat."""
    st.session_state.messages.append(
        {"role": "user", "content": [{"type": "text", "text": prompt}]}
    )
    with st.chat_message("user"):
        st.markdown(prompt)
    with st.chat_message("assistant"):
        with st.spinner("Generating response..."):
            response = send_message(prompt=prompt)
            request_id = response["request_id"]
            content = response["message"]["content"]
            display_content(content=content, request_id=request_id)  # type: ignore[arg-type]
    st.session_state.messages.append(
        {"role": "assistant", "content": content, "request_id": request_id}
    )

def display_content(
    content: List[Dict[str, str]],
    request_id: Optional[str] = None,
    message_index: Optional[int] = None,
) -> None:
    """Displays a content item for a message."""
    message_index = message_index or len(st.session_state.messages)
    if request_id:
        with st.expander("Request ID", expanded=False):
            st.markdown(request_id)
    for item in content:
        if item["type"] == "text":
            st.markdown(item["text"])
        elif item["type"] == "suggestions":
            with st.expander("Suggestions", expanded=True):
                for suggestion_index, suggestion in enumerate(item["suggestions"]):
                    if st.button(suggestion, key=f"{message_index}_{suggestion_index}"):
                        st.session_state.active_suggestion = suggestion
        elif item["type"] == "sql":
            with st.expander("SQL Query", expanded=False):
                st.code(item["statement"], language="sql")
            with st.expander("Results", expanded=True):
                with st.spinner("Running SQL..."):
                    df = pd.read_sql(item["statement"], st.session_state.CONN)
                    if len(df.index) > 1:
                        data_tab, line_tab, bar_tab = st.tabs(
                            ["Data", "Line Chart", "Bar Chart"]
                        )
                        data_tab.dataframe(df)
                        if len(df.columns) > 1:
                            df = df.set_index(df.columns[0])
                        with line_tab:
                            st.line_chart(df)
                        with bar_tab:
                            st.bar_chart(df)
                    else:
                        st.dataframe(df)

st.title("Cortex Analyst")
st.markdown(f"Semantic Model: `{FILE}`")

if "messages" not in st.session_state:
    st.session_state.messages = []
    st.session_state.suggestions = []
    st.session_state.active_suggestion = None

for message_index, message in enumerate(st.session_state.messages):
    with st.chat_message(message["role"]):
        display_content(
            content=message["content"],
            request_id=message.get("request_id"),
            message_index=message_index,
        )

if user_input := st.chat_input("What is your question?"):
    process_message(prompt=user_input)

if st.session_state.active_suggestion:
    process_message(prompt=st.session_state.active_suggestion)
    st.session_state.active_suggestion = None
Copy

앱을 실행하면 질문을 입력하라는 메시지가 표시됩니다. “What questions can I ask?”로 시작하여 몇 가지 제안을 시도해 보십시오.

4단계: 정리

정리(선택 사항)

자습서를 시작하기 전의 상태로 시스템을 되돌리려면 다음 DROP <오브젝트> 명령을 실행합니다.

DROP DATABASE IF EXISTS cortex_analyst_demo;
DROP WAREHOUSE IF EXISTS cortex_analyst_wh;
Copy

데이터베이스를 삭제하면 테이블과 같은 모든 하위 데이터베이스 오브젝트가 자동으로 제거됩니다.

다음 단계

축하합니다! Snowflake에서 “데이터와 대화”하는 간단한 Cortex Analyst 앱을 성공적으로 구축했습니다.

추가 리소스

다음 리소스를 활용하여 계속 알아보십시오.