Tutorial 3: Einen PDF-Chatbot mit Cortex Search erstellen¶
Einführung¶
Dieses Tutorial beschreibt, wie Sie mit Cortex Search einen Chatbot aus einem Datenset von PDF-Dokumenten erstellen. In Tutorial 2 haben Sie gelernt, wie man einen Chatbot aus Textdaten erstellt, die bereits aus der Quelle extrahiert wurden. Dieses Tutorial führt Sie durch ein Beispiel für das Extrahieren dieses Textes aus PDFs mithilfe einer einfachen Python-UDF und das anschließende Einspeisen der extrahierten Daten in einen Cortex Search Service.
Lerninhalte¶
Extrahieren von Text aus einer Gruppe von PDF-Dateien in einen Stagingbereich mithilfe einer Python-UDF.
Erstellen eines Cortex Search Service aus dem extrahierten Text.
Erstellen einer Streamlit-in-Snowflake-Chat-App, mit der Sie Fragen zu den aus den PDF-Dokumenten extrahierten Daten stellen können.
Voraussetzungen¶
Die folgenden Voraussetzungen müssen erfüllt sein, um dieses Tutorial abzuschließen:
Sie haben ein Snowflake-Konto sowie einen Benutzer mit einer Rolle, die die erforderlichen Berechtigungen zum Erstellen einer Datenbank sowie Tabellen, virtuellen Warehouse-Objekten, Cortex Search Services und Streamlit-Apps erteilt.
Eine Anleitung, wie Sie diese Anforderungen erfüllen können, finden Sie unter Snowflake in 20 Minuten.
Schritt 1: Setup¶
Rufen Sie die PDF-Daten ab¶
Für dieses Tutorial verwenden Sie ein Beispiel-Datenset der Sitzungsprotokolle des Federal Open Market Committee (FOMC). Dies ist ein Beispiel mit 12 10-seitigen Dokumenten mit Sitzungsnotizen von FOMC-Sitzungen aus den Jahren 2023 und 2024. Laden Sie die Dateien direkt von Ihrem Browser herunter, indem Sie diesem Link folgen:
Beispiel mit FOMC-Sitzungsprotokollen
Die vollständige Gruppe der FOMC-Sitzungsprotokolle finden Sie auf der Website der Federal Reserve US.
Bemerkung
Außerhalb einer Tutorial-Umgebung würden Sie Ihre eigenen Daten mitbringen, möglicherweise bereits in einem Snowflake-Stagingbereich.
Datenbank, Tabellen und Warehouse erstellen¶
Führen Sie die folgenden Anweisungen aus, um eine Datenbank und ein virtuelles Warehouse zu erstellen, die für dieses Tutorial benötigt werden. Nachdem Sie das Tutorial abgeschlossen haben, können Sie diese Objekte wieder löschen.
CREATE DATABASE IF NOT EXISTS cortex_search_tutorial_db;
CREATE OR REPLACE WAREHOUSE cortex_search_tutorial_wh WITH
WAREHOUSE_SIZE='X-SMALL'
AUTO_SUSPEND = 120
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED=TRUE;
USE WAREHOUSE cortex_search_tutorial_wh;
Bemerkung
Die
CREATE DATABASE
-Anweisung erstellt eine Datenbank. Die Datenbank enthält automatisch ein Schema mit dem Namen PUBLIC.Die
CREATE WAREHOUSE
-Anweisung erstellt ein Warehouse, das zunächst angehalten ist.
Schritt 2: Daten in Snowflake laden¶
Erstellen Sie zunächst einen Snowflake-Stagingbereich, um die Dateien zu speichern, die die Daten enthalten. In diesem Stagingbereich werden die PDF-Dateien des Sitzungsprotokolls gespeichert.
CREATE OR REPLACE STAGE cortex_search_tutorial_db.public.fomc
DIRECTORY = (ENABLE = TRUE)
ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
Bemerkung
Das Verzeichnis und die Verschlüsselung werden für die Generierung einer presigned_url für eine Datei konfiguriert. Wenn Sie keine presigned_url generieren müssen, können Sie diese Konfigurationen überspringen.
Laden Sie jetzt das Datenset hoch. Sie können das Datenset über die Snowsight oder über SQL hochladen. Hochladen über die Snowsight:
Melden Sie sich bei Snowsight an.
Wählen Sie im Navigationsmenü auf der linken Seite Data aus.
Wählen Sie Ihre Datenbank
cortex_search_tutorial_db
aus.Wählen Sie Ihr Schema
public
aus.Wählen Sie Stages und dann
fomc
aus.Wählen Sie oben rechts die Schaltfläche + Files aus.
Ziehen Sie Dateien per Drag & Drop in die UI oder wählen Sie Browse, um eine Datei aus dem Dialogfenster auszuwählen.
Wählen Sie Upload aus, um Ihre Datei hochzuladen.
Schritt 3: PDF-Dateien analysieren¶
Erstellen Sie eine Funktion zur Vorverarbeitung, die Folgendes tut:
PDF-Dateien analysieren und den Text extrahieren
Den Text für die Indizierung in kleinere Teile teilen
CREATE OR REPLACE FUNCTION cortex_search_tutorial_db.public.pdf_text_chunker(file_url STRING)
RETURNS TABLE (chunk VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
HANDLER = 'pdf_text_chunker'
PACKAGES = ('snowflake-snowpark-python', 'PyPDF2', 'langchain')
AS
$$
from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging
import pandas as pd
class pdf_text_chunker:
def read_pdf(self, file_url: str) -> str:
logger = logging.getLogger("udf_logger")
logger.info(f"Opening file {file_url}")
with SnowflakeFile.open(file_url, 'rb') as f:
buffer = io.BytesIO(f.readall())
reader = PyPDF2.PdfReader(buffer)
text = ""
for page in reader.pages:
try:
text += page.extract_text().replace('\n', ' ').replace('\0', ' ')
except:
text = "Unable to Extract"
logger.warn(f"Unable to extract from file {file_url}, page {page}")
return text
def process(self, file_url: str):
text = self.read_pdf(file_url)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 2000, # Adjust this as needed
chunk_overlap = 300, # Overlap to keep chunks contextual
length_function = len
)
chunks = text_splitter.split_text(text)
df = pd.DataFrame(chunks, columns=['chunk'])
yield from df.itertuples(index=False, name=None)
$$;
Erstellen Sie dann eine Tabelle, die die analysierten Daten aus den PDF-Dateien enthält.
CREATE OR REPLACE TABLE cortex_search_tutorial_db.public.docs_chunks_table AS
SELECT
relative_path,
build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path) AS file_url,
-- preserve file title information by concatenating relative_path with the chunk
CONCAT(relative_path, ': ', func.chunk) AS chunk,
'English' AS language
FROM
directory(@cortex_search_tutorial_db.public.fomc),
TABLE(cortex_search_tutorial_db.public.pdf_text_chunker(build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path))) AS func;
Schritt 4: Search Service erstellen¶
Erstellen Sie einen Search Service für die Suche in Ihrer neuen Tabelle, indem Sie den folgenden SQL-Befehl ausführen:
CREATE OR REPLACE CORTEX SEARCH SERVICE cortex_search_tutorial_db.public.fomc_meeting
ON chunk
ATTRIBUTES language
WAREHOUSE = cortex_search_tutorial_wh
TARGET_LAG = '1 hour'
AS (
SELECT
chunk,
relative_path,
file_url,
language
FROM cortex_search_tutorial_db.public.docs_chunks_table
);
Dieser Befehl gibt die attributes
an, d. h. die Spalten, nach denen Sie die Suchergebnisse filtern können, sowie das Warehouse und Zielverzögerungszeit. Die Suchspalte wird als chunk
bezeichnet, die in der Ausgangsabfrage als Verkettung mehrerer Textspalten in der Basistabelle erzeugt wird. Die anderen Spalten in der Ausgangsabfrage können in die Antwort auf eine Suchanfrage einbezogen werden.
Schritt 5: Eine Streamlit-App erstellen¶
Sie können den Dienst mit dem Python-SDK (unter Verwendung des snowflake
-Python-Pakets) abfragen. Dieses Tutorial zeigt die Verwendung des Python-SDK in einer Streamlit in Snowflake-Anwendung.
Vergewissern Sie sich zunächst, dass Ihre globale Snowsight UI-Rolle mit der Rolle übereinstimmt, mit der Sie den Dienst im Schritt „Dienst erstellen“ erstellt haben.
Melden Sie sich bei Snowsight an.
Wählen Sie im Navigationsmenü auf der linken Seite Projects » Streamlit aus.
Wählen Sie + Streamlit App aus.
Wichtig: Wählen Sie die Datenbank
cortex_search_tutorial_db
und das Schemapublic
für den Speicherort der App aus.Wählen Sie im linken Fensterbereich des Streamlit in Snowflake-Editors Packages aus, und fügen Sie
snowflake
(Version >= 0.8.0) undsnowflake-ml-python
hinzu, um die erforderlichen Pakete in Ihrer Anwendung zu installieren.Ersetzen Sie den Code der Beispielanwendung durch die folgende Streamlit-App:
import streamlit as st from snowflake.core import Root # requires snowflake>=0.8.0 from snowflake.cortex import Complete from snowflake.snowpark.context import get_active_session MODELS = [ "mistral-large", "snowflake-arctic", "llama3-70b", "llama3-8b", ] def init_messages(): """ Initialize the session state for chat messages. If the session state indicates that the conversation should be cleared or if the "messages" key is not in the session state, initialize it as an empty list. """ if st.session_state.clear_conversation or "messages" not in st.session_state: st.session_state.messages = [] def init_service_metadata(): """ Initialize the session state for cortex search service metadata. Query the available cortex search services from the Snowflake session and store their names and search columns in the session state. """ if "service_metadata" not in st.session_state: services = session.sql("SHOW CORTEX SEARCH SERVICES;").collect() service_metadata = [] if services: for s in services: svc_name = s["name"] svc_search_col = session.sql( f"DESC CORTEX SEARCH SERVICE {svc_name};" ).collect()[0]["search_column"] service_metadata.append( {"name": svc_name, "search_column": svc_search_col} ) st.session_state.service_metadata = service_metadata def init_config_options(): """ Initialize the configuration options in the Streamlit sidebar. Allow the user to select a cortex search service, clear the conversation, toggle debug mode, and toggle the use of chat history. Also provide advanced options to select a model, the number of context chunks, and the number of chat messages to use in the chat history. """ st.sidebar.selectbox( "Select cortex search service:", [s["name"] for s in st.session_state.service_metadata], key="selected_cortex_search_service", ) st.sidebar.button("Clear conversation", key="clear_conversation") st.sidebar.toggle("Debug", key="debug", value=False) st.sidebar.toggle("Use chat history", key="use_chat_history", value=True) with st.sidebar.expander("Advanced options"): st.selectbox("Select model:", MODELS, key="model_name") st.number_input( "Select number of context chunks", value=5, key="num_retrieved_chunks", min_value=1, max_value=10, ) st.number_input( "Select number of messages to use in chat history", value=5, key="num_chat_messages", min_value=1, max_value=10, ) st.sidebar.expander("Session State").write(st.session_state) def query_cortex_search_service(query, columns = [], filter={}): """ Query the selected cortex search service with the given query and retrieve context documents. Display the retrieved context documents in the sidebar if debug mode is enabled. Return the context documents as a string. Args: query (str): The query to search the cortex search service with. Returns: str: The concatenated string of context documents. """ db, schema = session.get_current_database(), session.get_current_schema() cortex_search_service = ( root.databases[db] .schemas[schema] .cortex_search_services[st.session_state.selected_cortex_search_service] ) context_documents = cortex_search_service.search( query, columns=columns, filter=filter, limit=st.session_state.num_retrieved_chunks ) results = context_documents.results service_metadata = st.session_state.service_metadata search_col = [s["search_column"] for s in service_metadata if s["name"] == st.session_state.selected_cortex_search_service][0].lower() context_str = "" for i, r in enumerate(results): context_str += f"Context document {i+1}: {r[search_col]} \n" + "\n" if st.session_state.debug: st.sidebar.text_area("Context documents", context_str, height=500) return context_str, results def get_chat_history(): """ Retrieve the chat history from the session state limited to the number of messages specified by the user in the sidebar options. Returns: list: The list of chat messages from the session state. """ start_index = max( 0, len(st.session_state.messages) - st.session_state.num_chat_messages ) return st.session_state.messages[start_index : len(st.session_state.messages) - 1] def complete(model, prompt): """ Generate a completion for the given prompt using the specified model. Args: model (str): The name of the model to use for completion. prompt (str): The prompt to generate a completion for. Returns: str: The generated completion. """ return Complete(model, prompt).replace("$", "\$") def make_chat_history_summary(chat_history, question): """ Generate a summary of the chat history combined with the current question to extend the query context. Use the language model to generate this summary. Args: chat_history (str): The chat history to include in the summary. question (str): The current user question to extend with the chat history. Returns: str: The generated summary of the chat history and question. """ prompt = f""" [INST] Based on the chat history below and the question, generate a query that extend the question with the chat history provided. The query should be in natural language. Answer with only the query. Do not add any explanation. <chat_history> {chat_history} </chat_history> <question> {question} </question> [/INST] """ summary = complete(st.session_state.model_name, prompt) if st.session_state.debug: st.sidebar.text_area( "Chat history summary", summary.replace("$", "\$"), height=150 ) return summary def create_prompt(user_question): """ Create a prompt for the language model by combining the user question with context retrieved from the cortex search service and chat history (if enabled). Format the prompt according to the expected input format of the model. Args: user_question (str): The user's question to generate a prompt for. Returns: str: The generated prompt for the language model. """ if st.session_state.use_chat_history: chat_history = get_chat_history() if chat_history != []: question_summary = make_chat_history_summary(chat_history, user_question) prompt_context, results = query_cortex_search_service( question_summary, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) else: prompt_context, results = query_cortex_search_service( user_question, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) else: prompt_context, results = query_cortex_search_service( user_question, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) chat_history = "" prompt = f""" [INST] You are a helpful AI chat assistant with RAG capabilities. When a user asks you a question, you will also be given context provided between <context> and </context> tags. Use that context with the user's chat history provided in the between <chat_history> and </chat_history> tags to provide a summary that addresses the user's question. Ensure the answer is coherent, concise, and directly relevant to the user's question. If the user asks a generic question which cannot be answered with the given context or chat_history, just say "I don't know the answer to that question. Don't saying things like "according to the provided context". <chat_history> {chat_history} </chat_history> <context> {prompt_context} </context> <question> {user_question} </question> [/INST] Answer: """ return prompt, results def main(): st.title(f":speech_balloon: Chatbot with Snowflake Cortex") init_service_metadata() init_config_options() init_messages() icons = {"assistant": "❄️", "user": "👤"} # Display chat messages from history on app rerun for message in st.session_state.messages: with st.chat_message(message["role"], avatar=icons[message["role"]]): st.markdown(message["content"]) disable_chat = ( "service_metadata" not in st.session_state or len(st.session_state.service_metadata) == 0 ) if question := st.chat_input("Ask a question...", disabled=disable_chat): # Add user message to chat history st.session_state.messages.append({"role": "user", "content": question}) # Display user message in chat message container with st.chat_message("user", avatar=icons["user"]): st.markdown(question.replace("$", "\$")) # Display assistant response in chat message container with st.chat_message("assistant", avatar=icons["assistant"]): message_placeholder = st.empty() question = question.replace("'", "") prompt, results = create_prompt(question) with st.spinner("Thinking..."): generated_response = complete( st.session_state.model_name, prompt ) # build references table for citation markdown_table = "###### References \n\n| PDF Title | URL |\n|-------|-----|\n" for ref in results: markdown_table += f"| {ref['relative_path']} | [Link]({ref['file_url']}) |\n" message_placeholder.markdown(generated_response + "\n\n" + markdown_table) st.session_state.messages.append( {"role": "assistant", "content": generated_response} ) if __name__ == "__main__": session = get_active_session() root = Root(session) main()
Schritt 6: App testen¶
Im rechten Bereich des Streamlit in Snowflake-Editorfensters sehen Sie eine Vorschau Ihrer Streamlit-App. Sie sollte ähnlich wie der folgende Screenshot aussehen:

Geben Sie eine Abfrage in das Textfeld ein, um Ihre neue App zu testen. Hier sind einige Beispielabfragen, die Sie ausprobieren können:
- Beispielsitzung 1: Frage-Antwort-Interaktion mit mehreren Runden
How was gpd growth in q4 23?
How was unemployment in the same quarter?
- Beispielsitzung 2: Zusammenfassen mehrerer Dokumente
How has the fed's view of the market change over the course of 2024?
- Beispielsitzung 3: Verzichten, wenn die Dokumente nicht die richtige Antwort enthalten
What was janet yellen's opinion about 2024 q1?
Schritt 7: Bereinigen¶
Bereinigen (optional)¶
Führen Sie die folgenden DROP <Objekt>-Befehle aus, um Ihr System in den Zustand zu versetzen, bevor Sie das Tutorial begonnen haben:
DROP DATABASE IF EXISTS cortex_search_tutorial_db;
DROP WAREHOUSE IF EXISTS cortex_search_tutorial_wh;
Wenn Sie die Datenbank löschen, werden automatisch alle untergeordneten Datenbankobjekte wie Tabellen entfernt.
Nächste Schritte¶
Herzlichen Glückwunsch! Sie haben erfolgreich eine Such-App aus einer Gruppe von PDF-Dateien in Snowflake erstellt.
Zusätzliche Ressourcen¶
Sie können mit den folgenden Ressourcen weiterlernen: