Tutorial: Beantworten Sie Fragen zu Zeitserienumsatzdaten mit Cortex Analyst

Einführung

Cortex Analyst wandelt Fragen in natürlicher Sprache zu Ihren Daten in Ergebnisse um, indem es SQL-Abfragen erstellt und ausführt. In diesem Tutorial wird beschrieben, wie Cortex Analyst eingerichtet wird, dass es Fragen zu einem Zeitreihen-Umsatz-Datenset beantwortet.

Lerninhalte

  • Erstellen eines semantisches Modells für das Datenset.

  • Erstellen einer Streamlit-App, die Cortex Analyst abfragt.

Voraussetzungen

Die folgenden Voraussetzungen müssen erfüllt sein, um dieses Tutorial abzuschließen:

  • Sie haben ein Snowflake-Konto sowie einen Benutzer mit einer Rolle, die die erforderlichen Berechtigungen zum Erstellen von Datenbank-, Schema-, Tabellen-, Stagingbereich- und virtuellen Warehouse-Objekten erteilt.

  • Sie haben Streamlit auf Ihrem lokalen System eingerichtet.

Eine Anleitung, wie Sie diese Anforderungen erfüllen können, finden Sie unter Snowflake in 20 Minuten.

Schritt 1: Setup

Abrufen der Beispieldaten

Sie werden einen Beispieldatensatz verwenden, den Sie von GitHub heruntergeladen haben. Laden Sie die folgenden Datendateien auf Ihr System herunter:

  • daily_revenue.csv

  • product.csv

  • region.csv

Laden Sie auch das semantische YAML-Modell von GitHub herunter.

Bevor Sie fortfahren, sollten Sie einen Blick auf dieses semantische Modell werfen. Das semantische Modell ergänzt das SQL-Schema jeder Tabelle mit zusätzlichen Informationen, die Cortex Analyst helfen, Fragen zu den Daten zu verstehen. Weitere Informationen dazu finden Sie unter Cortex Analyst-Spezifikation des semantischen Modells.

Bemerkung

In einer nicht-tutoriellen Umgebung würden Sie Ihre eigenen Daten mitbringen, möglicherweise bereits in einer Snowflake-Tabelle, und Ihr eigenes semantisches Modell entwickeln.

Erstellen der Snowflake Objekte

Verwenden Sie Snowsight, die Snowflake-UI, um die für dieses Tutorial benötigten Snowflake-Objekte zu erstellen. Nachdem Sie das Tutorial abgeschlossen haben, können Sie diese Objekte wieder löschen.

Bemerkung

Verwenden Sie eine Rolle, die Datenbanken, Schemas, Warehouses, Stagingbereiche und Tabellen erstellen kann.

So erstellen Sie die Objekte:

  1. Wählen Sie auf der Weboberfläche Snowsight in der linken Navigationsleiste Worksheets und dann die Schaltfläche +. Ein neues SQL-Arbeitsblatt wird angezeigt.

  2. Fügen Sie den unten stehenden SQL-Code in das Arbeitsblatt ein und wählen Sie dann Run All aus dem Dropdown-Menü oben rechts im Arbeitsblatt aus.

/*--
• Database, schema, warehouse, and stage creation
--*/

USE ROLE SECURITYADMIN;

CREATE ROLE IF NOT EXISTS cortex_user_role;
GRANT DATABASE ROLE SNOWFLAKE.CORTEX_USER TO ROLE cortex_user_role;

GRANT ROLE cortex_user_role TO USER <user>;

USE ROLE sysadmin;

-- Create demo database
CREATE OR REPLACE DATABASE cortex_analyst_demo;

-- Create schema
CREATE OR REPLACE SCHEMA cortex_analyst_demo.revenue_timeseries;

-- Create warehouse
CREATE OR REPLACE WAREHOUSE cortex_analyst_wh
    WAREHOUSE_SIZE = 'large'
    WAREHOUSE_TYPE = 'standard'
    AUTO_SUSPEND = 60
    AUTO_RESUME = TRUE
    INITIALLY_SUSPENDED = TRUE
COMMENT = 'Warehouse for Cortex Analyst demo';

GRANT USAGE ON WAREHOUSE cortex_analyst_wh TO ROLE cortex_user_role;
GRANT OPERATE ON WAREHOUSE cortex_analyst_wh TO ROLE cortex_user_role;

GRANT OWNERSHIP ON SCHEMA cortex_analyst_demo.revenue_timeseries TO ROLE cortex_user_role;
GRANT OWNERSHIP ON DATABASE cortex_analyst_demo TO ROLE cortex_user_role;


USE ROLE cortex_user_role;

-- Use the created warehouse
USE WAREHOUSE cortex_analyst_wh;

USE DATABASE cortex_analyst_demo;
USE SCHEMA cortex_analyst_demo.revenue_timeseries;

-- Create stage for raw data
CREATE OR REPLACE STAGE raw_data DIRECTORY = (ENABLE = TRUE);

/*--
• Fact and Dimension Table Creation
--*/

-- Fact table: daily_revenue
CREATE OR REPLACE TABLE cortex_analyst_demo.revenue_timeseries.daily_revenue (
    date DATE,
    revenue FLOAT,
    cogs FLOAT,
    forecasted_revenue FLOAT,
    product_id INT,
    region_id INT
);

-- Dimension table: product_dim
CREATE OR REPLACE TABLE cortex_analyst_demo.revenue_timeseries.product_dim (
    product_id INT,
    product_line VARCHAR(16777216)
);

-- Dimension table: region_dim
CREATE OR REPLACE TABLE cortex_analyst_demo.revenue_timeseries.region_dim (
    region_id INT,
    sales_region VARCHAR(16777216),
    state VARCHAR(16777216)
);
Copy

Die obige SQL-Anweisung erstellt die folgenden Objekte:

  • Eine Datenbank mit dem Namen cortex_analyst_demo

  • Ein Schema innerhalb dieser Datenbank mit dem Namen revenue_timeseries

  • Drei Tabellen in diesem Schema: daily_revenue, product_dim und region_dim

  • Ein Stagingbereich mit dem Namen raw_data, der die Rohdaten enthält, die wir in diese Tabellen laden werden

  • Ein virtuelles Warehouse namens cortex_analyst_wh

Bemerkung

Das virtuelle Warehouse ist vorläufig angehalten. Es wird automatisch gestartet, wenn Sie eine Abfrage ausführen.

Schritt 2: Daten in Snowflake laden

Um die Daten aus den CSV-Dateien in Snowflake zu erhalten, laden Sie diese in den Stagingbereich hoch, und laden Sie dann die Daten aus dem Stagingbereich in die Tabellen. Gleichzeitig laden Sie die Datei des semantischen YAML-Modells zur Verwendung in einem späteren Schritt hoch.

Die Dateien, die Sie hochladen werden, sind:

  • daily_revenue.csv

  • product.csv

  • region.csv

  • revenue_timeseries.yaml

So laden Sie die Dateien in das Snowsight hoch:

  1. Wählen Sie auf der Snowsight UI in der linken Navigationsleiste Data icon und dann Add Data aus. Wählen Sie auf der Seite Add Data die Option Load files into a stage.

  2. Ziehen Sie die vier Dateien, die Sie im vorherigen Schritt heruntergeladen haben, in das Snowsight-Fenster.

  3. Wählen Sie die Datenbank cortex_analyst_demo und den Stagingbereich raw_data, und klicken Sie dann auf die Schaltfläche Upload, um die Dateien hochzuladen.

Nachdem Sie nun die Dateien hochgeladen haben, laden Sie die Daten aus den CSV-Dateien, indem Sie die folgenden SQL-Befehle in einem Snowsight-Arbeitsblatt ausführen.

USE WAREHOUSE cortex_analyst_wh;

COPY INTO cortex_analyst_demo.revenue_timeseries.daily_revenue
FROM @raw_data
FILES = ('daily_revenue.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;

COPY INTO cortex_analyst_demo.revenue_timeseries.product_dim
FROM @raw_data
FILES = ('product.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;

COPY INTO cortex_analyst_demo.revenue_timeseries.region_dim
FROM @raw_data
FILES = ('region.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
Copy

Bemerkung

Im Ausgabebereich wird nur das Ergebnis des letzten Befehls angezeigt. Sie können die Befehle Zeile für Zeile ausführen, um die Ergebnisse der einzelnen Befehle zu sehen.

Schritt 3: Erstellen Sie eine Streamlit-App, um über Cortex Analyst mit Ihren Daten zu kommunizieren

So erstellen Sie eine Streamlit-App, die Cortex Analyst verwendet:

  1. Erstellen Sie lokal eine Python-Datei mit dem Namen analyst_demo.py.

  2. Kopieren Sie den folgenden Code in die Datei.

  3. Ersetzen Sie die Platzhalterwerte durch Ihre Kontodaten.

  4. Führen Sie die Streamlit-App mit streamlit run analyst_demo.py aus.

from typing import Any, Dict, List, Optional

import pandas as pd
import requests
import snowflake.connector
import streamlit as st

DATABASE = "CORTEX_ANALYST_DEMO"
SCHEMA = "REVENUE_TIMESERIES"
STAGE = "RAW_DATA"
FILE = "revenue_timeseries.yaml"
WAREHOUSE = "cortex_analyst_wh"

# replace values below with your Snowflake connection information
HOST = "<host>"
ACCOUNT = "<account>"
USER = "<user>"
PASSWORD = "<password>"
ROLE = "<role>"

if 'CONN' not in st.session_state or st.session_state.CONN is None:
    st.session_state.CONN = snowflake.connector.connect(
        user=USER,
        password=PASSWORD,
        account=ACCOUNT,
        host=HOST,
        port=443,
        warehouse=WAREHOUSE,
        role=ROLE,
    )

def send_message(prompt: str) -> Dict[str, Any]:
    """Calls the REST API and returns the response."""
    request_body = {
        "messages": [{"role": "user", "content": [{"type": "text", "text": prompt}]}],
        "semantic_model_file": f"@{DATABASE}.{SCHEMA}.{STAGE}/{FILE}",
    }
    resp = requests.post(
        url=f"https://{HOST}/api/v2/cortex/analyst/message",
        json=request_body,
        headers={
            "Authorization": f'Snowflake Token="{st.session_state.CONN.rest.token}"',
            "Content-Type": "application/json",
        },
    )
    request_id = resp.headers.get("X-Snowflake-Request-Id")
    if resp.status_code < 400:
        return {**resp.json(), "request_id": request_id}  # type: ignore[arg-type]
    else:
        raise Exception(
            f"Failed request (id: {request_id}) with status {resp.status_code}: {resp.text}"
        )

def process_message(prompt: str) -> None:
    """Processes a message and adds the response to the chat."""
    st.session_state.messages.append(
        {"role": "user", "content": [{"type": "text", "text": prompt}]}
    )
    with st.chat_message("user"):
        st.markdown(prompt)
    with st.chat_message("assistant"):
        with st.spinner("Generating response..."):
            response = send_message(prompt=prompt)
            request_id = response["request_id"]
            content = response["message"]["content"]
            display_content(content=content, request_id=request_id)  # type: ignore[arg-type]
    st.session_state.messages.append(
        {"role": "assistant", "content": content, "request_id": request_id}
    )

def display_content(
    content: List[Dict[str, str]],
    request_id: Optional[str] = None,
    message_index: Optional[int] = None,
) -> None:
    """Displays a content item for a message."""
    message_index = message_index or len(st.session_state.messages)
    if request_id:
        with st.expander("Request ID", expanded=False):
            st.markdown(request_id)
    for item in content:
        if item["type"] == "text":
            st.markdown(item["text"])
        elif item["type"] == "suggestions":
            with st.expander("Suggestions", expanded=True):
                for suggestion_index, suggestion in enumerate(item["suggestions"]):
                    if st.button(suggestion, key=f"{message_index}_{suggestion_index}"):
                        st.session_state.active_suggestion = suggestion
        elif item["type"] == "sql":
            with st.expander("SQL Query", expanded=False):
                st.code(item["statement"], language="sql")
            with st.expander("Results", expanded=True):
                with st.spinner("Running SQL..."):
                    df = pd.read_sql(item["statement"], st.session_state.CONN)
                    if len(df.index) > 1:
                        data_tab, line_tab, bar_tab = st.tabs(
                            ["Data", "Line Chart", "Bar Chart"]
                        )
                        data_tab.dataframe(df)
                        if len(df.columns) > 1:
                            df = df.set_index(df.columns[0])
                        with line_tab:
                            st.line_chart(df)
                        with bar_tab:
                            st.bar_chart(df)
                    else:
                        st.dataframe(df)

st.title("Cortex Analyst")
st.markdown(f"Semantic Model: `{FILE}`")

if "messages" not in st.session_state:
    st.session_state.messages = []
    st.session_state.suggestions = []
    st.session_state.active_suggestion = None

for message_index, message in enumerate(st.session_state.messages):
    with st.chat_message(message["role"]):
        display_content(
            content=message["content"],
            request_id=message.get("request_id"),
            message_index=message_index,
        )

if user_input := st.chat_input("What is your question?"):
    process_message(prompt=user_input)

if st.session_state.active_suggestion:
    process_message(prompt=st.session_state.active_suggestion)
    st.session_state.active_suggestion = None
Copy

Wenn Sie die App starten, werden Sie aufgefordert, eine Frage einzugeben. Beginnen Sie mit „Welche Fragen kann ich stellen?“ und probieren Sie einige der Vorschläge aus.

Schritt 4: Bereinigen

Bereinigen (optional)

Führen Sie die folgenden DROP <Objekt>-Befehle aus, um Ihr System in den Zustand zu versetzen, bevor Sie das Tutorial begonnen haben:

DROP DATABASE IF EXISTS cortex_analyst_demo;
DROP WAREHOUSE IF EXISTS cortex_analyst_wh;
Copy

Wenn Sie die Datenbank löschen, werden automatisch alle untergeordneten Datenbankobjekte wie Tabellen entfernt.

Nächste Schritte

Herzlichen Glückwunsch! Sie haben erfolgreich eine einfache Cortex Analyst-App erstellt, um in Snowflake „mit Ihren Daten zu sprechen“.

Zusätzliche Ressourcen

Setzen Sie die Einarbeitung mithilfe der folgenden Ressourcen fort: