자습서 3: Cortex Search를 사용하여 PDF 챗봇 개발¶
소개¶
이 자습서에서는 Cortex Search를 사용하여 PDF 문서 데이터 세트에서 챗봇을 구축하는 방법을 설명합니다. 자습서 2 에서는 이미 소스에서 추출한 텍스트 데이터로 챗봇을 구축하는 방법에 대해 알아봤습니다. 이 자습서에서는 기본 Python UDF를 사용하여 PDFs에서 해당 텍스트를 추출한 다음, 추출된 데이터를 Cortex Search Service로 수집하는 예제를 안내합니다.
알아볼 내용¶
Python UDF를 사용하여 스테이지의 PDF 파일 세트에서 텍스트를 추출합니다.
추출된 텍스트에서 Cortex Search Service를 생성합니다.
PDF 문서에서 추출한 데이터에 대해 질문할 수 있는 Streamlit-in-Snowflake 채팅 앱을 생성합니다.
전제 조건¶
이 자습서를 완료하려면 다음과 같은 필수 조건이 필요합니다.
데이터베이스, 테이블, 가상 웨어하우스 오브젝트, Cortex Search Service 및 Streamlit 앱을 만드는 데 필요한 권한을 부여하는 역할이 있는 Snowflake 계정과 사용자가 있습니다.
이러한 요구 사항을 충족하기 위한 지침은 20분만에 Snowflake 시작하기 섹션을 참조하십시오.
1단계: 설정¶
PDF 데이터 가져오기¶
이 자습서에서는 연방공개시장위원회(FOMC) 회의록의 샘플 데이터 세트를 사용합니다. 이는 2023년과 2024년 FOMC 미팅의 미팅 노트가 포함된 10페이지 분량의 문서 12개 샘플입니다. 다음 링크를 따라 브라우저에서 직접 파일을 다운로드합니다.
전체 FOMC 회의록 세트는 US Federal Reserve 웹사이트 에서 제공됩니다.
참고
자습서가 아닌 환경에서는 사용자의 데이터를 가져올 수 있으며, 이미 Snowflake 스테이지에 있을 수 있습니다.
데이터베이스, 테이블, 웨어하우스 만들기¶
다음 문을 실행하여 이 자습서에 필요한 데이터베이스와 가상 웨어하우스를 생성합니다. 자습서를 마친 후 이러한 오브젝트를 삭제할 수 있습니다.
CREATE DATABASE IF NOT EXISTS cortex_search_tutorial_db;
CREATE OR REPLACE WAREHOUSE cortex_search_tutorial_wh WITH
WAREHOUSE_SIZE='X-SMALL'
AUTO_SUSPEND = 120
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED=TRUE;
USE WAREHOUSE cortex_search_tutorial_wh;
참고
CREATE DATABASE
문으로 데이터베이스를 만듭니다. 데이터베이스에는 PUBLIC이라는 스키마가 자동으로 포함됩니다.CREATE WAREHOUSE
문으로 처음에 일시 중단되는 웨어하우스를 만듭니다.
2단계: Snowflake에 데이터 로드¶
우선, 데이터가 포함된 파일을 저장하기 위해 Snowflake 스테이지를 생성합니다. 이 스테이지에는 회의록 PDF 파일이 보관됩니다.
CREATE OR REPLACE STAGE cortex_search_tutorial_db.public.fomc
DIRECTORY = (ENABLE = TRUE)
ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
참고
디렉터리와 암호화는 파일에 대한 presigned_url을 생성하도록 구성되어 있습니다. presigned_url을 생성할 필요가 없으면 이러한 구성을 건너뛸 수 있습니다.
이제 데이터 세트를 업로드합니다. 데이터 세트는 Snowsight 에서 업로드하거나 SQL을 사용하여 업로드할 수 있습니다. Snowsight 에서 업로드하려면:
Snowsight 에 로그인합니다.
왼쪽 탐색 메뉴에서 Data 를 선택합니다.
cortex_search_tutorial_db
데이터베이스를 선택합니다.스키마
public
을 선택합니다.Stages 를 선택하고
fomc
를 선택합니다.오른쪽 상단에서 + Files 버튼을 선택합니다.
UI에 파일을 끌어서 놓거나 Browse 를 선택하여 대화 창에서 파일을 선택합니다.
Upload 를 선택하여 파일을 업로드합니다.
3단계: PDF 파일 구문 분석¶
다음을 수행하는 전처리 함수를 생성합니다.
PDF 파일을 구문 분석하고 텍스트를 추출합니다.
인덱싱을 위해 텍스트를 작은 청크로 나눕니다.
CREATE OR REPLACE FUNCTION cortex_search_tutorial_db.public.pdf_text_chunker(file_url STRING)
RETURNS TABLE (chunk VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
HANDLER = 'pdf_text_chunker'
PACKAGES = ('snowflake-snowpark-python', 'PyPDF2', 'langchain')
AS
$$
from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging
import pandas as pd
class pdf_text_chunker:
def read_pdf(self, file_url: str) -> str:
logger = logging.getLogger("udf_logger")
logger.info(f"Opening file {file_url}")
with SnowflakeFile.open(file_url, 'rb') as f:
buffer = io.BytesIO(f.readall())
reader = PyPDF2.PdfReader(buffer)
text = ""
for page in reader.pages:
try:
text += page.extract_text().replace('\n', ' ').replace('\0', ' ')
except:
text = "Unable to Extract"
logger.warn(f"Unable to extract from file {file_url}, page {page}")
return text
def process(self, file_url: str):
text = self.read_pdf(file_url)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 2000, # Adjust this as needed
chunk_overlap = 300, # Overlap to keep chunks contextual
length_function = len
)
chunks = text_splitter.split_text(text)
df = pd.DataFrame(chunks, columns=['chunk'])
yield from df.itertuples(index=False, name=None)
$$;
그런 다음 PDF 파일에서 구문 분석된 데이터를 보관할 테이블을 만듭니다.
CREATE OR REPLACE TABLE cortex_search_tutorial_db.public.docs_chunks_table AS
SELECT
relative_path,
build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path) AS file_url,
-- preserve file title information by concatenating relative_path with the chunk
CONCAT(relative_path, ': ', func.chunk) AS chunk,
'English' AS language
FROM
directory(@cortex_search_tutorial_db.public.fomc),
TABLE(cortex_search_tutorial_db.public.pdf_text_chunker(build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path))) AS func;
4단계: 검색 서비스 만들기¶
다음 SQL 명령을 실행하여 새 테이블에 대한 검색 서비스를 생성합니다.
CREATE OR REPLACE CORTEX SEARCH SERVICE cortex_search_tutorial_db.public.fomc_meeting
ON chunk
ATTRIBUTES language
WAREHOUSE = cortex_search_tutorial_wh
TARGET_LAG = '1 hour'
AS (
SELECT
chunk,
relative_path,
file_url,
language
FROM cortex_search_tutorial_db.public.docs_chunks_table
);
이 명령은 검색 결과를 필터링할 수 있는 열인 attributes
와 웨어하우스 및 목표 지연 시간을 지정합니다. 검색 열은 소스 쿼리에서 기본 테이블에 있는 여러 텍스트 열의 연결로 생성되는 chunk
로 지정됩니다. 소스 쿼리의 다른 열은 검색 요청에 대한 응답으로 포함될 수 있습니다.
5단계: Streamlit 앱 만들기¶
Python SDK로 서비스를 쿼리(snowflake
Python 패키지 사용)할 수 있습니다. 이 자습서에서는 Streamlit in Snowflake 애플리케이션에서 Python SDK를 사용하는 방법을 보여줍니다.
먼저 서비스 생성 단계에서 전역 Snowsight UI 역할이 서비스를 생성하는 데 사용된 역할과 동일한지 확인합니다.
Snowsight 에 로그인합니다.
왼쪽 탐색 메뉴에서 Projects » Streamlit 를 선택합니다.
+ Streamlit App 를 선택합니다.
중요: 앱 위치에 대한
cortex_search_tutorial_db
데이터베이스와public
스키마를 선택합니다.Streamlit in Snowflake 편집기의 왼쪽 창에서 Packages 를 선택하고
snowflake
(버전 >= 0.8.0) 및snowflake-ml-python
을 추가하여 애플리케이션에 필수 패키지를 설치합니다.예제 애플리케이션 코드를 다음 Streamlit 앱으로 바꿉니다.
import streamlit as st from snowflake.core import Root # requires snowflake>=0.8.0 from snowflake.cortex import Complete from snowflake.snowpark.context import get_active_session MODELS = [ "mistral-large", "snowflake-arctic", "llama3-70b", "llama3-8b", ] def init_messages(): """ Initialize the session state for chat messages. If the session state indicates that the conversation should be cleared or if the "messages" key is not in the session state, initialize it as an empty list. """ if st.session_state.clear_conversation or "messages" not in st.session_state: st.session_state.messages = [] def init_service_metadata(): """ Initialize the session state for cortex search service metadata. Query the available cortex search services from the Snowflake session and store their names and search columns in the session state. """ if "service_metadata" not in st.session_state: services = session.sql("SHOW CORTEX SEARCH SERVICES;").collect() service_metadata = [] if services: for s in services: svc_name = s["name"] svc_search_col = session.sql( f"DESC CORTEX SEARCH SERVICE {svc_name};" ).collect()[0]["search_column"] service_metadata.append( {"name": svc_name, "search_column": svc_search_col} ) st.session_state.service_metadata = service_metadata def init_config_options(): """ Initialize the configuration options in the Streamlit sidebar. Allow the user to select a cortex search service, clear the conversation, toggle debug mode, and toggle the use of chat history. Also provide advanced options to select a model, the number of context chunks, and the number of chat messages to use in the chat history. """ st.sidebar.selectbox( "Select cortex search service:", [s["name"] for s in st.session_state.service_metadata], key="selected_cortex_search_service", ) st.sidebar.button("Clear conversation", key="clear_conversation") st.sidebar.toggle("Debug", key="debug", value=False) st.sidebar.toggle("Use chat history", key="use_chat_history", value=True) with st.sidebar.expander("Advanced options"): st.selectbox("Select model:", MODELS, key="model_name") st.number_input( "Select number of context chunks", value=5, key="num_retrieved_chunks", min_value=1, max_value=10, ) st.number_input( "Select number of messages to use in chat history", value=5, key="num_chat_messages", min_value=1, max_value=10, ) st.sidebar.expander("Session State").write(st.session_state) def query_cortex_search_service(query, columns = [], filter={}): """ Query the selected cortex search service with the given query and retrieve context documents. Display the retrieved context documents in the sidebar if debug mode is enabled. Return the context documents as a string. Args: query (str): The query to search the cortex search service with. Returns: str: The concatenated string of context documents. """ db, schema = session.get_current_database(), session.get_current_schema() cortex_search_service = ( root.databases[db] .schemas[schema] .cortex_search_services[st.session_state.selected_cortex_search_service] ) context_documents = cortex_search_service.search( query, columns=columns, filter=filter, limit=st.session_state.num_retrieved_chunks ) results = context_documents.results service_metadata = st.session_state.service_metadata search_col = [s["search_column"] for s in service_metadata if s["name"] == st.session_state.selected_cortex_search_service][0].lower() context_str = "" for i, r in enumerate(results): context_str += f"Context document {i+1}: {r[search_col]} \n" + "\n" if st.session_state.debug: st.sidebar.text_area("Context documents", context_str, height=500) return context_str, results def get_chat_history(): """ Retrieve the chat history from the session state limited to the number of messages specified by the user in the sidebar options. Returns: list: The list of chat messages from the session state. """ start_index = max( 0, len(st.session_state.messages) - st.session_state.num_chat_messages ) return st.session_state.messages[start_index : len(st.session_state.messages) - 1] def complete(model, prompt): """ Generate a completion for the given prompt using the specified model. Args: model (str): The name of the model to use for completion. prompt (str): The prompt to generate a completion for. Returns: str: The generated completion. """ return Complete(model, prompt).replace("$", "\$") def make_chat_history_summary(chat_history, question): """ Generate a summary of the chat history combined with the current question to extend the query context. Use the language model to generate this summary. Args: chat_history (str): The chat history to include in the summary. question (str): The current user question to extend with the chat history. Returns: str: The generated summary of the chat history and question. """ prompt = f""" [INST] Based on the chat history below and the question, generate a query that extend the question with the chat history provided. The query should be in natural language. Answer with only the query. Do not add any explanation. <chat_history> {chat_history} </chat_history> <question> {question} </question> [/INST] """ summary = complete(st.session_state.model_name, prompt) if st.session_state.debug: st.sidebar.text_area( "Chat history summary", summary.replace("$", "\$"), height=150 ) return summary def create_prompt(user_question): """ Create a prompt for the language model by combining the user question with context retrieved from the cortex search service and chat history (if enabled). Format the prompt according to the expected input format of the model. Args: user_question (str): The user's question to generate a prompt for. Returns: str: The generated prompt for the language model. """ if st.session_state.use_chat_history: chat_history = get_chat_history() if chat_history != []: question_summary = make_chat_history_summary(chat_history, user_question) prompt_context, results = query_cortex_search_service( question_summary, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) else: prompt_context, results = query_cortex_search_service( user_question, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) else: prompt_context, results = query_cortex_search_service( user_question, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) chat_history = "" prompt = f""" [INST] You are a helpful AI chat assistant with RAG capabilities. When a user asks you a question, you will also be given context provided between <context> and </context> tags. Use that context with the user's chat history provided in the between <chat_history> and </chat_history> tags to provide a summary that addresses the user's question. Ensure the answer is coherent, concise, and directly relevant to the user's question. If the user asks a generic question which cannot be answered with the given context or chat_history, just say "I don't know the answer to that question. Don't saying things like "according to the provided context". <chat_history> {chat_history} </chat_history> <context> {prompt_context} </context> <question> {user_question} </question> [/INST] Answer: """ return prompt, results def main(): st.title(f":speech_balloon: Chatbot with Snowflake Cortex") init_service_metadata() init_config_options() init_messages() icons = {"assistant": "❄️", "user": "👤"} # Display chat messages from history on app rerun for message in st.session_state.messages: with st.chat_message(message["role"], avatar=icons[message["role"]]): st.markdown(message["content"]) disable_chat = ( "service_metadata" not in st.session_state or len(st.session_state.service_metadata) == 0 ) if question := st.chat_input("Ask a question...", disabled=disable_chat): # Add user message to chat history st.session_state.messages.append({"role": "user", "content": question}) # Display user message in chat message container with st.chat_message("user", avatar=icons["user"]): st.markdown(question.replace("$", "\$")) # Display assistant response in chat message container with st.chat_message("assistant", avatar=icons["assistant"]): message_placeholder = st.empty() question = question.replace("'", "") prompt, results = create_prompt(question) with st.spinner("Thinking..."): generated_response = complete( st.session_state.model_name, prompt ) # build references table for citation markdown_table = "###### References \n\n| PDF Title | URL |\n|-------|-----|\n" for ref in results: markdown_table += f"| {ref['relative_path']} | [Link]({ref['file_url']}) |\n" message_placeholder.markdown(generated_response + "\n\n" + markdown_table) st.session_state.messages.append( {"role": "assistant", "content": generated_response} ) if __name__ == "__main__": session = get_active_session() root = Root(session) main()
6단계: 앱 사용해 보기¶
Streamlit in Snowflake 편집기 창의 오른쪽 창에서 Streamlit 앱의 미리 보기를 볼 수 있습니다. 미리 보기는 다음 스크린샷과 유사해야 합니다.

텍스트 상자에 질문을 입력하여 새 앱을 사용해 봅니다. 시도해 볼 수 있는 몇 가지 샘플 쿼리는 다음과 같습니다.
- 예제 세션 1: 질문과 답변을 여러 차례 주고받기
How was gpd growth in q4 23?
How was unemployment in the same quarter?
- 예제 세션 2: 여러 문서 요약
How has the fed's view of the market change over the course of 2024?
- 예제 세션 3: 문서에 정답이 포함되어 있지 않은 경우 기권하기
What was janet yellen's opinion about 2024 q1?
7단계: 정리¶
정리(선택 사항)¶
자습서를 시작하기 전의 상태로 시스템을 되돌리려면 다음 DROP <오브젝트> 명령을 실행합니다.
DROP DATABASE IF EXISTS cortex_search_tutorial_db;
DROP WAREHOUSE IF EXISTS cortex_search_tutorial_wh;
데이터베이스를 삭제하면 테이블과 같은 모든 하위 데이터베이스 오브젝트가 자동으로 제거됩니다.
다음 단계¶
축하합니다! Snowflake의 PDF 파일 세트에서 검색 앱을 성공적으로 빌드했습니다.
추가 리소스¶
다음 리소스를 사용하여 계속 학습할 수 있습니다.