チュートリアル: Cortex Analystで時系列収益データに関する質問に答える

概要

Cortex Analyst は、 SQL クエリを生成して実行することにより、データに関する自然言語による質問を結果に変換します。このチュートリアルでは、時系列の収益データセットに関する質問に回答するために Cortex Analyst を設定する方法について説明します。

学習内容

  • データセットのセマンティックモデルを確立する。

  • Cortex Analyst をクエリするStreamlitアプリを作成する。

前提条件

このチュートリアルを完了するには、以下の前提条件が必要です。

  • Snowflakeアカウントと、データベース、スキーマ、テーブル、ステージ、および仮想ウェアハウスオブジェクトを作成するために必要な権限を付与するロールを持つユーザーがあります。

  • Streamlit をローカルシステムに設定しました。

これらの要件を満たす手順については、 Snowflakeを20分で紹介 をご参照ください。

ステップ1: 設定する

サンプルデータを取得する

Google Driveから ダウンロードしたサンプルデータセットと対応するセマンティックモデルを使用します。以下のデータファイルをシステムにダウンロードします。

  • daily_revenue_combined.csv

  • daily_revenue_by_region_combined.csv

  • daily_revenue_by_product_combined.csv

セマンティックモデルもダウンロードします。

  • revenue_timeseries.yaml

データのセマンティックモデルを含む YAML ファイルを見るために、少し時間を取ることをおすすめします。セマンティックモデルは、各テーブルの SQL スキーマを補足し、 Cortex Analyst がデータに関する質問を理解するのに役立つ追加情報を提供します。詳細については、 Cortex Analyst セマンティックモデル仕様 をご参照ください。

注釈

チュートリアル以外の設定では、自分のデータ(おそらくすでにSnowflakeテーブルにある)を持ち込み、自身のセマンティックモデルを開発します。

Snowflakeオブジェクトを作成する

このチュートリアルに必要なSnowflakeオブジェクトを作成するには、Snowflake UI の Snowsight を使用します。チュートリアルの完了後に、これらのオブジェクトをドロップできます。

注釈

データベース、スキーマ、ウェアハウス、ステージ、およびテーブルを作成できるロールを使用します。

オブジェクトを作成するには:

  1. Snowsight ユーザーインターフェースで、左のナビゲーションバーにある Worksheets を選択し、 + ボタンを選択します。新しい SQL ワークシートが表示されます。

  2. 以下の SQL コードをワークシートに貼り付け、ワークシート右上のドロップダウンメニューから Run All を選択します。

CREATE OR REPLACE DATABASE cortex_analyst_demo;

CREATE OR REPLACE SCHEMA revenue_timeseries;

CREATE OR REPLACE WAREHOUSE cortex_analyst_wh
    WAREHOUSE_SIZE = 'large'
    WAREHOUSE_TYPE = 'standard'
    AUTO_SUSPEND = 60
    AUTO_RESUME = TRUE
    INITIALLY_SUSPENDED = TRUE
COMMENT = 'warehouse for cortex analyst demo';

USE WAREHOUSE cortex_analyst_wh;

CREATE STAGE raw_data DIRECTORY = (ENABLE = TRUE);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE (
    DATE DATE,
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT (
    DATE DATE,
    PRODUCT_LINE VARCHAR(16777216),
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION (
    DATE DATE,
    SALES_REGION VARCHAR(16777216),
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);
Copy

上記の SQL は、以下のオブジェクトを作成します。

  • cortex_analyst_demo というデータベース

  • revenue_timeseries というデータベース内のスキーマ

  • そのスキーマにある DAILY_REVENUE、 DAILY_REVENUE_BY_PRODUCT、および DAILY_REVENUE_BY_REGION の3つのテーブル

  • これらのテーブルにロードする生データを保持する raw_data という名前のステージ

  • cortex_analyst_wh という仮想ウェアハウス

なお、仮想ウェアハウスは最初は一時停止しており、必要なときに自動的に開始されます。

ステップ2: データをSnowflakeにロードする

CSV ファイルからSnowflakeにデータを取り込むには、それらをステージにアップロードし、ステージからテーブルにデータをロードします。同時に、後のステップで使用するセマンティックモデル YAML ファイルをアップロードします。

アップロードするファイルは以下の通りです。

  • daily_revenue_combined.csv

  • daily_revenue_by_region_combined.csv

  • daily_revenue_by_product_combined.csv

  • revenue_timeseries.yaml

Snowsight にファイルをアップロードするには:

  1. Snowsight UI で、左ナビゲーションバーの Data icon を選択し、次に Add Data を選択します。 Add Data ページで、 Load files into a stage を選択します。

  2. 前のステップでダウンロードした4つのファイルを、 Snowsight ウィンドウにドラッグします。

  3. データベース cortex_analyst_demo とステージ raw_data を選択し、 Upload ボタンを選択してファイルをアップロードします。

ファイルをアップロードしたので、 Snowsight ワークシートで以下の SQL コマンドを実行して、 CSV ファイルからデータをロードします。

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE
FROM @raw_data
FILES = ('daily_revenue_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT
FROM @raw_data
FILES = ('daily_revenue_by_product_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION
FROM @raw_data
FILES = ('daily_revenue_by_region_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
Copy

ステップ3: Streamlitアプリを作成し、 Cortex Analyst を通じてデータ通信を行います。

Cortex Analyst を使用するStreamlitアプリを作成するには:

  1. analyst_demo.py というPythonファイルをローカルに作成します。

  2. 以下のコードをファイルにコピーします。

  3. プレースホルダーの値をあなたのアカウント詳細に置き換えます。

  4. streamlit run analyst_demo.py を使用してStreamlitアプリを実行します。

from typing import Any, Dict, List, Optional

import pandas as pd
import requests
import snowflake.connector
import streamlit as st

DATABASE = "CORTEX_ANALYST_DEMO"
SCHEMA = "REVENUE_TIMESERIES"
STAGE = "RAW_DATA"
FILE = "revenue_timeseries.yaml"
WAREHOUSE = "cortex_analyst_wh"

# replace values below with your Snowflake connection information
HOST = "<host>"
ACCOUNT = "<account>"
USER = "<user>"
PASSWORD = "<password>"
ROLE = "<role>"

if 'CONN' not in st.session_state or st.session_state.CONN is None:
    st.session_state.CONN = snowflake.connector.connect(
        user=USER,
        password=PASSWORD,
        account=ACCOUNT,
        host=HOST,
        port=443,
        warehouse=WAREHOUSE,
        role=ROLE,
    )

def send_message(prompt: str) -> Dict[str, Any]:
    """Calls the REST API and returns the response."""
    request_body = {
        "messages": [{"role": "user", "content": [{"type": "text", "text": prompt}]}],
        "semantic_model_file": f"@{DATABASE}.{SCHEMA}.{STAGE}/{FILE}",
    }
    resp = requests.post(
        url=f"https://{HOST}/api/v2/cortex/analyst/message",
        json=request_body,
        headers={
            "Authorization": f'Snowflake Token="{st.session_state.CONN.rest.token}"',
            "Content-Type": "application/json",
        },
    )
    request_id = resp.headers.get("X-Snowflake-Request-Id")
    if resp.status_code < 400:
        return {**resp.json(), "request_id": request_id}  # type: ignore[arg-type]
    else:
        raise Exception(
            f"Failed request (id: {request_id}) with status {resp.status_code}: {resp.text}"
        )

def process_message(prompt: str) -> None:
    """Processes a message and adds the response to the chat."""
    st.session_state.messages.append(
        {"role": "user", "content": [{"type": "text", "text": prompt}]}
    )
    with st.chat_message("user"):
        st.markdown(prompt)
    with st.chat_message("assistant"):
        with st.spinner("Generating response..."):
            response = send_message(prompt=prompt)
            request_id = response["request_id"]
            content = response["message"]["content"]
            display_content(content=content, request_id=request_id)  # type: ignore[arg-type]
    st.session_state.messages.append(
        {"role": "assistant", "content": content, "request_id": request_id}
    )

def display_content(
    content: List[Dict[str, str]],
    request_id: Optional[str] = None,
    message_index: Optional[int] = None,
) -> None:
    """Displays a content item for a message."""
    message_index = message_index or len(st.session_state.messages)
    if request_id:
        with st.expander("Request ID", expanded=False):
            st.markdown(request_id)
    for item in content:
        if item["type"] == "text":
            st.markdown(item["text"])
        elif item["type"] == "suggestions":
            with st.expander("Suggestions", expanded=True):
                for suggestion_index, suggestion in enumerate(item["suggestions"]):
                    if st.button(suggestion, key=f"{message_index}_{suggestion_index}"):
                        st.session_state.active_suggestion = suggestion
        elif item["type"] == "sql":
            with st.expander("SQL Query", expanded=False):
                st.code(item["statement"], language="sql")
            with st.expander("Results", expanded=True):
                with st.spinner("Running SQL..."):
                    df = pd.read_sql(item["statement"], st.session_state.CONN)
                    if len(df.index) > 1:
                        data_tab, line_tab, bar_tab = st.tabs(
                            ["Data", "Line Chart", "Bar Chart"]
                        )
                        data_tab.dataframe(df)
                        if len(df.columns) > 1:
                            df = df.set_index(df.columns[0])
                        with line_tab:
                            st.line_chart(df)
                        with bar_tab:
                            st.bar_chart(df)
                    else:
                        st.dataframe(df)

st.title("Cortex Analyst")
st.markdown(f"Semantic Model: `{FILE}`")

if "messages" not in st.session_state:
    st.session_state.messages = []
    st.session_state.suggestions = []
    st.session_state.active_suggestion = None

for message_index, message in enumerate(st.session_state.messages):
    with st.chat_message(message["role"]):
        display_content(
            content=message["content"],
            request_id=message.get("request_id"),
            message_index=message_index,
        )

if user_input := st.chat_input("What is your question?"):
    process_message(prompt=user_input)

if st.session_state.active_suggestion:
    process_message(prompt=st.session_state.active_suggestion)
    st.session_state.active_suggestion = None
Copy

アプリを起動すると、質問を入力するよう促されます。「どんな質問ができますか?」から始め、その提案のいくつかを試してみます。

ステップ4: クリーンアップする

クリーンアップ(オプション)

次の DROP <オブジェクト> コマンドを実行して、システムをチュートリアルを開始する前の状態に戻します。

DROP DATABASE IF EXISTS cortex_analyst_demo;
DROP WAREHOUSE IF EXISTS cortex_analyst_wh;
Copy

データベースをドロップすると、テーブルなどのすべての子データベースオブジェクトが自動的に削除されます。

次のステップ

おめでとうございます。あなたはSnowflakeで「データと会話する」ためのシンプルな Cortex Analyst アプリを構築することに成功しました。

追加のリソース

以下のリソースを使って学習を続行します。