チュートリアル: Cortex Analystで時系列収益データに関する質問に答える¶
概要¶
Cortex Analyst は、 SQL クエリを生成して実行することにより、データに関する自然言語による質問を結果に変換します。このチュートリアルでは、時系列の収益データセットに関する質問に回答するために Cortex Analyst を設定する方法について説明します。
学習内容¶
データセットのセマンティックモデルを確立する。
Cortex Analyst をクエリするStreamlitアプリを作成する。
前提条件¶
このチュートリアルを完了するには、以下の前提条件が必要です。
Snowflakeアカウントと、データベース、スキーマ、テーブル、ステージ、および仮想ウェアハウスオブジェクトを作成するために必要な権限を付与するロールを持つユーザーがあります。
Streamlit をローカルシステムに設定しました。
これらの要件を満たす手順については、 Snowflakeを20分で紹介 をご参照ください。
ステップ1: 設定する¶
サンプルデータを取得する¶
Google Driveから ダウンロードしたサンプルデータセットと対応するセマンティックモデルを使用します。以下のデータファイルをシステムにダウンロードします。
daily_revenue_combined.csv
daily_revenue_by_region_combined.csv
daily_revenue_by_product_combined.csv
セマンティックモデルもダウンロードします。
revenue_timeseries.yaml
データのセマンティックモデルを含む YAML ファイルを見るために、少し時間を取ることをおすすめします。セマンティックモデルは、各テーブルの SQL スキーマを補足し、 Cortex Analyst がデータに関する質問を理解するのに役立つ追加情報を提供します。詳細については、 Cortex Analyst セマンティックモデル仕様 をご参照ください。
注釈
チュートリアル以外の設定では、自分のデータ(おそらくすでにSnowflakeテーブルにある)を持ち込み、自身のセマンティックモデルを開発します。
Snowflakeオブジェクトを作成する¶
このチュートリアルに必要なSnowflakeオブジェクトを作成するには、Snowflake UI の Snowsight を使用します。チュートリアルの完了後に、これらのオブジェクトをドロップできます。
注釈
データベース、スキーマ、ウェアハウス、ステージ、およびテーブルを作成できるロールを使用します。
オブジェクトを作成するには:
Snowsight ユーザーインターフェースで、左のナビゲーションバーにある Worksheets を選択し、 + ボタンを選択します。新しい SQL ワークシートが表示されます。
以下の SQL コードをワークシートに貼り付け、ワークシート右上のドロップダウンメニューから Run All を選択します。
CREATE OR REPLACE DATABASE cortex_analyst_demo;
CREATE OR REPLACE SCHEMA revenue_timeseries;
CREATE OR REPLACE WAREHOUSE cortex_analyst_wh
WAREHOUSE_SIZE = 'large'
WAREHOUSE_TYPE = 'standard'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE
COMMENT = 'warehouse for cortex analyst demo';
USE WAREHOUSE cortex_analyst_wh;
CREATE STAGE raw_data DIRECTORY = (ENABLE = TRUE);
CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE (
DATE DATE,
REVENUE FLOAT,
COGS FLOAT,
FORECASTED_REVENUE FLOAT
);
CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT (
DATE DATE,
PRODUCT_LINE VARCHAR(16777216),
REVENUE FLOAT,
COGS FLOAT,
FORECASTED_REVENUE FLOAT
);
CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION (
DATE DATE,
SALES_REGION VARCHAR(16777216),
REVENUE FLOAT,
COGS FLOAT,
FORECASTED_REVENUE FLOAT
);
上記の SQL は、以下のオブジェクトを作成します。
cortex_analyst_demo
というデータベースrevenue_timeseries
というデータベース内のスキーマそのスキーマにある DAILY_REVENUE、 DAILY_REVENUE_BY_PRODUCT、および DAILY_REVENUE_BY_REGION の3つのテーブル
これらのテーブルにロードする生データを保持する
raw_data
という名前のステージcortex_analyst_wh
という仮想ウェアハウス
なお、仮想ウェアハウスは最初は一時停止しており、必要なときに自動的に開始されます。
ステップ2: データをSnowflakeにロードする¶
CSV ファイルからSnowflakeにデータを取り込むには、それらをステージにアップロードし、ステージからテーブルにデータをロードします。同時に、後のステップで使用するセマンティックモデル YAML ファイルをアップロードします。
アップロードするファイルは以下の通りです。
daily_revenue_combined.csv
daily_revenue_by_region_combined.csv
daily_revenue_by_product_combined.csv
revenue_timeseries.yaml
Snowsight にファイルをアップロードするには:
Snowsight UI で、左ナビゲーションバーの Data icon を選択し、次に Add Data を選択します。 Add Data ページで、 Load files into a stage を選択します。
前のステップでダウンロードした4つのファイルを、 Snowsight ウィンドウにドラッグします。
データベース
cortex_analyst_demo
とステージraw_data
を選択し、 Upload ボタンを選択してファイルをアップロードします。
ファイルをアップロードしたので、 Snowsight ワークシートで以下の SQL コマンドを実行して、 CSV ファイルからデータをロードします。
COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE
FROM @raw_data
FILES = ('daily_revenue_combined.csv')
FILE_FORMAT = (
TYPE=CSV,
SKIP_HEADER=1,
FIELD_DELIMITER=',',
TRIM_SPACE=FALSE,
FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
REPLACE_INVALID_CHARACTERS=TRUE,
DATE_FORMAT=AUTO,
TIME_FORMAT=AUTO,
TIMESTAMP_FORMAT=AUTO
EMPTY_FIELD_AS_NULL = FALSE
error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT
FROM @raw_data
FILES = ('daily_revenue_by_product_combined.csv')
FILE_FORMAT = (
TYPE=CSV,
SKIP_HEADER=1,
FIELD_DELIMITER=',',
TRIM_SPACE=FALSE,
FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
REPLACE_INVALID_CHARACTERS=TRUE,
DATE_FORMAT=AUTO,
TIME_FORMAT=AUTO,
TIMESTAMP_FORMAT=AUTO
EMPTY_FIELD_AS_NULL = FALSE
error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION
FROM @raw_data
FILES = ('daily_revenue_by_region_combined.csv')
FILE_FORMAT = (
TYPE=CSV,
SKIP_HEADER=1,
FIELD_DELIMITER=',',
TRIM_SPACE=FALSE,
FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
REPLACE_INVALID_CHARACTERS=TRUE,
DATE_FORMAT=AUTO,
TIME_FORMAT=AUTO,
TIMESTAMP_FORMAT=AUTO
EMPTY_FIELD_AS_NULL = FALSE
error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
ステップ3: Streamlitアプリを作成し、 Cortex Analyst を通じてデータ通信を行います。¶
Cortex Analyst を使用するStreamlitアプリを作成するには:
analyst_demo.py
というPythonファイルをローカルに作成します。以下のコードをファイルにコピーします。
プレースホルダーの値をあなたのアカウント詳細に置き換えます。
streamlit run analyst_demo.py
を使用してStreamlitアプリを実行します。
from typing import Any, Dict, List, Optional
import pandas as pd
import requests
import snowflake.connector
import streamlit as st
DATABASE = "CORTEX_ANALYST_DEMO"
SCHEMA = "REVENUE_TIMESERIES"
STAGE = "RAW_DATA"
FILE = "revenue_timeseries.yaml"
WAREHOUSE = "cortex_analyst_wh"
# replace values below with your Snowflake connection information
HOST = "<host>"
ACCOUNT = "<account>"
USER = "<user>"
PASSWORD = "<password>"
ROLE = "<role>"
if 'CONN' not in st.session_state or st.session_state.CONN is None:
st.session_state.CONN = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
host=HOST,
port=443,
warehouse=WAREHOUSE,
role=ROLE,
)
def send_message(prompt: str) -> Dict[str, Any]:
"""Calls the REST API and returns the response."""
request_body = {
"messages": [{"role": "user", "content": [{"type": "text", "text": prompt}]}],
"semantic_model_file": f"@{DATABASE}.{SCHEMA}.{STAGE}/{FILE}",
}
resp = requests.post(
url=f"https://{HOST}/api/v2/cortex/analyst/message",
json=request_body,
headers={
"Authorization": f'Snowflake Token="{st.session_state.CONN.rest.token}"',
"Content-Type": "application/json",
},
)
request_id = resp.headers.get("X-Snowflake-Request-Id")
if resp.status_code < 400:
return {**resp.json(), "request_id": request_id} # type: ignore[arg-type]
else:
raise Exception(
f"Failed request (id: {request_id}) with status {resp.status_code}: {resp.text}"
)
def process_message(prompt: str) -> None:
"""Processes a message and adds the response to the chat."""
st.session_state.messages.append(
{"role": "user", "content": [{"type": "text", "text": prompt}]}
)
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
with st.spinner("Generating response..."):
response = send_message(prompt=prompt)
request_id = response["request_id"]
content = response["message"]["content"]
display_content(content=content, request_id=request_id) # type: ignore[arg-type]
st.session_state.messages.append(
{"role": "assistant", "content": content, "request_id": request_id}
)
def display_content(
content: List[Dict[str, str]],
request_id: Optional[str] = None,
message_index: Optional[int] = None,
) -> None:
"""Displays a content item for a message."""
message_index = message_index or len(st.session_state.messages)
if request_id:
with st.expander("Request ID", expanded=False):
st.markdown(request_id)
for item in content:
if item["type"] == "text":
st.markdown(item["text"])
elif item["type"] == "suggestions":
with st.expander("Suggestions", expanded=True):
for suggestion_index, suggestion in enumerate(item["suggestions"]):
if st.button(suggestion, key=f"{message_index}_{suggestion_index}"):
st.session_state.active_suggestion = suggestion
elif item["type"] == "sql":
with st.expander("SQL Query", expanded=False):
st.code(item["statement"], language="sql")
with st.expander("Results", expanded=True):
with st.spinner("Running SQL..."):
df = pd.read_sql(item["statement"], st.session_state.CONN)
if len(df.index) > 1:
data_tab, line_tab, bar_tab = st.tabs(
["Data", "Line Chart", "Bar Chart"]
)
data_tab.dataframe(df)
if len(df.columns) > 1:
df = df.set_index(df.columns[0])
with line_tab:
st.line_chart(df)
with bar_tab:
st.bar_chart(df)
else:
st.dataframe(df)
st.title("Cortex Analyst")
st.markdown(f"Semantic Model: `{FILE}`")
if "messages" not in st.session_state:
st.session_state.messages = []
st.session_state.suggestions = []
st.session_state.active_suggestion = None
for message_index, message in enumerate(st.session_state.messages):
with st.chat_message(message["role"]):
display_content(
content=message["content"],
request_id=message.get("request_id"),
message_index=message_index,
)
if user_input := st.chat_input("What is your question?"):
process_message(prompt=user_input)
if st.session_state.active_suggestion:
process_message(prompt=st.session_state.active_suggestion)
st.session_state.active_suggestion = None
アプリを起動すると、質問を入力するよう促されます。「どんな質問ができますか?」から始め、その提案のいくつかを試してみます。
ステップ4: クリーンアップする¶
クリーンアップ(オプション)¶
次の DROP <オブジェクト> コマンドを実行して、システムをチュートリアルを開始する前の状態に戻します。
DROP DATABASE IF EXISTS cortex_analyst_demo;
DROP WAREHOUSE IF EXISTS cortex_analyst_wh;
データベースをドロップすると、テーブルなどのすべての子データベースオブジェクトが自動的に削除されます。
次のステップ¶
おめでとうございます。あなたはSnowflakeで「データと会話する」ためのシンプルな Cortex Analyst アプリを構築することに成功しました。
追加のリソース¶
以下のリソースを使って学習を続行します。