Tutorial: Responda a perguntas sobre dados de receita de séries temporais com o Cortex Analyst

Introdução

Cortex Analyst transforma perguntas em linguagem natural sobre seus dados em resultados, gerando e executando consultas SQL. Este tutorial descreve como configurar o Cortex Analyst para responder a perguntas sobre um conjunto de dados de receita de séries temporais.

O que você aprenderá

  • Estabelecer um modelo semântico para o conjunto de dados.

  • Criar um aplicativo Streamlit que consulte Cortex Analyst.

Pré-requisitos

Os seguintes pré-requisitos são necessários para concluir este tutorial:

  • Você tem uma conta e um usuário Snowflake com uma função que concede os privilégios necessários para criar um banco de dados, esquema, tabelas, estágio e objetos de warehouse virtual.

  • Você configurou o Streamlit em seu sistema local.

Consulte o Snowflake em 20 minutos para obter instruções para atender a estes requisitos.

Etapa 1: Configuração

Como obter dados da amostra

Você usará um conjunto de dados de amostra e um modelo semântico correspondente baixado do Google Drive. Baixe os seguintes arquivos de dados para seu sistema:

  • daily_revenue_combined.csv

  • daily_revenue_by_region_combined.csv

  • daily_revenue_by_product_combined.csv

Baixe também o modelo semântico:

  • revenue_timeseries.yaml

Talvez você queira reservar um momento para dar uma olhada no arquivo YAML, que contém o modelo semântico para os dados. O modelo semântico complementa o esquema SQL de cada tabela com informações adicionais que ajudam o Cortex Analyst a entender perguntas sobre os dados. Para obter mais informações, consulte Especificação do modelo semântico Cortex Analyst.

Nota

Em uma configuração diferente de tutorial, você traria seus próprios dados, possivelmente já em uma tabela Snowflake, e desenvolveria seu próprio modelo semântico.

Criação dos objetos Snowflake

Use Snowsight, a UI do Snowflake, para criar os objetos Snowflake necessários para este tutorial. Depois de concluir o tutorial, você poderá descartar estes objetos.

Nota

Use uma função que possa criar bancos de dados, esquemas, warehouses, estágios e tabelas.

Para criar os objetos:

  1. Na interface do usuário Snowsight, selecione Worksheets na barra de navegação esquerda e, em seguida, selecione o botão +. Uma nova planilha SQL aparece.

  2. Cole o código SQL abaixo na planilha e selecione Run All no menu suspenso no canto superior direito da planilha.

CREATE OR REPLACE DATABASE cortex_analyst_demo;

CREATE OR REPLACE SCHEMA revenue_timeseries;

CREATE OR REPLACE WAREHOUSE cortex_analyst_wh
    WAREHOUSE_SIZE = 'large'
    WAREHOUSE_TYPE = 'standard'
    AUTO_SUSPEND = 60
    AUTO_RESUME = TRUE
    INITIALLY_SUSPENDED = TRUE
COMMENT = 'warehouse for cortex analyst demo';

USE WAREHOUSE cortex_analyst_wh;

CREATE STAGE raw_data DIRECTORY = (ENABLE = TRUE);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE (
    DATE DATE,
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT (
    DATE DATE,
    PRODUCT_LINE VARCHAR(16777216),
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION (
    DATE DATE,
    SALES_REGION VARCHAR(16777216),
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);
Copy

O SQL acima cria os seguintes objetos:

  • Um banco de dados chamado cortex_analyst_demo

  • Um esquema dentro desse banco de dados chamado revenue_timeseries

  • Três tabelas nesse esquema: DAILY_REVENUE, DAILY_REVENUE_BY_PRODUCT e DAILY_REVENUE_BY_REGION

  • Um estágio chamado raw_data que conterá os dados brutos que carregar nessas tabelas

  • Um warehouse virtual chamado cortex_analyst_wh

Observe que o warehouse virtual é inicialmente suspenso e será iniciado automaticamente quando necessário.

Etapa 2: Carregamento de dados para o Snowflake

Para obter os dados dos arquivos CSV no Snowflake, você os carregará no estágio e, em seguida, carregará os dados do estágio nas tabelas. Ao mesmo tempo, você carregará o arquivo YAML do modelo semântico para uso em uma etapa posterior.

Os arquivos que você irá carregar são:

  • daily_revenue_combined.csv

  • daily_revenue_by_region_combined.csv

  • daily_revenue_by_product_combined.csv

  • revenue_timeseries.yaml

Para carregar os arquivos em Snowsight:

  1. Na UI do Snowsight, selecione o Data icon na barra de navegação esquerda e depois Add Data. Na página Add Data, selecione Load files into a stage.

  2. Arraste os quatro arquivos que você baixou na etapa anterior para a janela Snowsight.

  3. Escolha o banco de dados cortex_analyst_demo e o estágio raw_data, depois selecione o botão Upload para carregar os arquivos.

Agora que você carregou os arquivos, carregue os dados dos arquivos CSV executando os comandos SQL abaixo em uma planilha do Snowsight.

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE
FROM @raw_data
FILES = ('daily_revenue_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT
FROM @raw_data
FILES = ('daily_revenue_by_product_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION
FROM @raw_data
FILES = ('daily_revenue_by_region_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
Copy

Etapa 3: crie um aplicativo Streamlit para se comunicar com seus dados por meio do Cortex Analyst

Para criar um aplicativo Streamlit que usa Cortex Analyst:

  1. Crie um arquivo Python localmente chamado analyst_demo.py.

  2. Copie o código abaixo no arquivo.

  3. Substitua os valores do espaço reservado pelos detalhes da sua conta.

  4. Execute o aplicativo Streamlit usando streamlit run analyst_demo.py.

from typing import Any, Dict, List, Optional

import pandas as pd
import requests
import snowflake.connector
import streamlit as st

DATABASE = "CORTEX_ANALYST_DEMO"
SCHEMA = "REVENUE_TIMESERIES"
STAGE = "RAW_DATA"
FILE = "revenue_timeseries.yaml"
WAREHOUSE = "cortex_analyst_wh"

# replace values below with your Snowflake connection information
HOST = "<host>"
ACCOUNT = "<account>"
USER = "<user>"
PASSWORD = "<password>"
ROLE = "<role>"

if 'CONN' not in st.session_state or st.session_state.CONN is None:
    st.session_state.CONN = snowflake.connector.connect(
        user=USER,
        password=PASSWORD,
        account=ACCOUNT,
        host=HOST,
        port=443,
        warehouse=WAREHOUSE,
        role=ROLE,
    )

def send_message(prompt: str) -> Dict[str, Any]:
    """Calls the REST API and returns the response."""
    request_body = {
        "messages": [{"role": "user", "content": [{"type": "text", "text": prompt}]}],
        "semantic_model_file": f"@{DATABASE}.{SCHEMA}.{STAGE}/{FILE}",
    }
    resp = requests.post(
        url=f"https://{HOST}/api/v2/cortex/analyst/message",
        json=request_body,
        headers={
            "Authorization": f'Snowflake Token="{st.session_state.CONN.rest.token}"',
            "Content-Type": "application/json",
        },
    )
    request_id = resp.headers.get("X-Snowflake-Request-Id")
    if resp.status_code < 400:
        return {**resp.json(), "request_id": request_id}  # type: ignore[arg-type]
    else:
        raise Exception(
            f"Failed request (id: {request_id}) with status {resp.status_code}: {resp.text}"
        )

def process_message(prompt: str) -> None:
    """Processes a message and adds the response to the chat."""
    st.session_state.messages.append(
        {"role": "user", "content": [{"type": "text", "text": prompt}]}
    )
    with st.chat_message("user"):
        st.markdown(prompt)
    with st.chat_message("assistant"):
        with st.spinner("Generating response..."):
            response = send_message(prompt=prompt)
            request_id = response["request_id"]
            content = response["message"]["content"]
            display_content(content=content, request_id=request_id)  # type: ignore[arg-type]
    st.session_state.messages.append(
        {"role": "assistant", "content": content, "request_id": request_id}
    )

def display_content(
    content: List[Dict[str, str]],
    request_id: Optional[str] = None,
    message_index: Optional[int] = None,
) -> None:
    """Displays a content item for a message."""
    message_index = message_index or len(st.session_state.messages)
    if request_id:
        with st.expander("Request ID", expanded=False):
            st.markdown(request_id)
    for item in content:
        if item["type"] == "text":
            st.markdown(item["text"])
        elif item["type"] == "suggestions":
            with st.expander("Suggestions", expanded=True):
                for suggestion_index, suggestion in enumerate(item["suggestions"]):
                    if st.button(suggestion, key=f"{message_index}_{suggestion_index}"):
                        st.session_state.active_suggestion = suggestion
        elif item["type"] == "sql":
            with st.expander("SQL Query", expanded=False):
                st.code(item["statement"], language="sql")
            with st.expander("Results", expanded=True):
                with st.spinner("Running SQL..."):
                    df = pd.read_sql(item["statement"], st.session_state.CONN)
                    if len(df.index) > 1:
                        data_tab, line_tab, bar_tab = st.tabs(
                            ["Data", "Line Chart", "Bar Chart"]
                        )
                        data_tab.dataframe(df)
                        if len(df.columns) > 1:
                            df = df.set_index(df.columns[0])
                        with line_tab:
                            st.line_chart(df)
                        with bar_tab:
                            st.bar_chart(df)
                    else:
                        st.dataframe(df)

st.title("Cortex Analyst")
st.markdown(f"Semantic Model: `{FILE}`")

if "messages" not in st.session_state:
    st.session_state.messages = []
    st.session_state.suggestions = []
    st.session_state.active_suggestion = None

for message_index, message in enumerate(st.session_state.messages):
    with st.chat_message(message["role"]):
        display_content(
            content=message["content"],
            request_id=message.get("request_id"),
            message_index=message_index,
        )

if user_input := st.chat_input("What is your question?"):
    process_message(prompt=user_input)

if st.session_state.active_suggestion:
    process_message(prompt=st.session_state.active_suggestion)
    st.session_state.active_suggestion = None
Copy

Ao executar o aplicativo, ele solicita que você insira uma pergunta. Comece com “Quais perguntas posso fazer?” e experimente algumas das sugestões.

Etapa 4: limpeza

Limpeza (opcional)

Execute os seguintes comandos DROP <objeto> para retornar o sistema ao seu estado antes de iniciar o tutorial:

DROP DATABASE IF EXISTS cortex_analyst_demo;
DROP WAREHOUSE IF EXISTS cortex_analyst_wh;
Copy

Descartar o banco de dados remove automaticamente todos os objetos do banco de dados filho, tais como tabelas.

Próximos passos

Parabéns! Você criou com sucesso um aplicativo Cortex Analyst simples para “conversar com seus dados” no Snowflake.

Recursos adicionais

Continue aprendendo usando os seguintes recursos: