Tutoriel 3 : Construire un Chatbot PDF avec Cortex Search¶
Introduction¶
Ce tutoriel décrit comment créer un chatbot à partir d’un ensemble de données de documents PDF en utilisant Cortex Search. Dans le Tutoriel 2, vous avez appris à créer un Chatbot à partir de données textuelles déjà extraites de sa source. Ce tutoriel présente un exemple d’extraction de ce texte à partir des PDFs en utilisant une fonction UDF Python basique, puis en ingérant les données extraites dans un Cortex Search Service.
Ce que vous apprendrez¶
Extraire du texte à partir d’un ensemble de fichiers PDF dans une zone de préparation à l’aide d’un UDF Python.
Créer un Cortex Search Service à partir du texte extrait.
Créer une application de chat Streamlit-in-Snowflake qui vous permet de poser des questions sur les données extraites des documents PDF.
Conditions préalables¶
Les prérequis suivants sont nécessaires à la réalisation de ce tutoriel :
Vous disposez d’un compte Snowflake et d’un utilisateur avec un rôle qui accorde les privilèges nécessaires pour créer une base de données, des tables, des objets d’entrepôt virtuel, des Cortex Search Services et des applications Streamlit.
Consultez les Snowflake en 20 minutes pour obtenir des instructions afin de répondre à ces exigences.
Étape 1 : Configuration¶
Obtenez les données PDF¶
Vous utiliserez un échantillon de données des comptes rendus des réunions du Comité fédéral de l’Open Market (FOMC) pour ce tutoriel. Il s’agit d’un échantillon de 12 documents de 10 pages avec des notes des réunions du FOMC de 2023 et 2024. Téléchargez les fichiers directement depuis votre navigateur en suivant ce lien :
L’ensemble complet des comptes rendus du FOMC peuvent être trouvées sur le site web de la Réserve fédérale US.
Note
Dans un environnement non didactique, vous apporteriez vos propres données, éventuellement déjà dans une zone de préparation Snowflake.
Créer la base de données, les tables et l’entrepôt¶
Exécutez les instructions suivantes pour créer une base de données et un entrepôt virtuel nécessaires pour ce tutoriel. Après avoir terminé le tutoriel, vous pouvez détruire ces objets.
CREATE DATABASE IF NOT EXISTS cortex_search_tutorial_db;
CREATE OR REPLACE WAREHOUSE cortex_search_tutorial_wh WITH
WAREHOUSE_SIZE='X-SMALL'
AUTO_SUSPEND = 120
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED=TRUE;
USE WAREHOUSE cortex_search_tutorial_wh;
Note
L’instruction
CREATE DATABASE
crée une base de données. La base de données comprend automatiquement un schéma nommé PUBLIC.L’instruction
CREATE WAREHOUSE
crée un entrepôt initialement suspendu.
Étape 2 : charger les données dans Snowflake¶
Créez d’abord une zone de préparation Snowflake pour stocker les fichiers contenant les données. Cette zone de préparation contiendra les fichiers PDF des comptes rendus de la réunion.
CREATE OR REPLACE STAGE cortex_search_tutorial_db.public.fomc
DIRECTORY = (ENABLE = TRUE)
ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
Note
Le répertoire et le chiffrement sont configurés pour générer une presigned_url pour un fichier. Si vous n’avez pas besoin de générer une presigned_url, vous pouvez ignorer ces configurations.
Téléchargez maintenant l’ensemble de données. Vous pouvez télécharger l’ensemble de données dans Snowsight ou en utilisant SQL. Pour télécharger dans Snowsight :
Connectez-vous à Snowsight.
Sélectionnez Data dans le menu de navigation de gauche.
Sélectionnez votre base de données
cortex_search_tutorial_db
.Sélectionnez votre schéma
public
.Sélectionnez Stages et
fomc
.En haut à droite, sélectionnez le bouton + Files.
Faites glisser et déposez les fichiers dans l’UI ou sélectionnez Browse pour choisir un fichier dans la fenêtre de dialogue.
Sélectionnez Upload pour charger votre fichier.
Étape 3 : Analyser les fichiers PDF¶
Créer une fonction de prétraitement pour effectuer les opérations suivantes :
Analyser les fichiers PDF et extraire le texte.
Découper le texte en morceaux plus petits pour l’indexation.
CREATE OR REPLACE FUNCTION cortex_search_tutorial_db.public.pdf_text_chunker(file_url STRING)
RETURNS TABLE (chunk VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
HANDLER = 'pdf_text_chunker'
PACKAGES = ('snowflake-snowpark-python', 'PyPDF2', 'langchain')
AS
$$
from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging
import pandas as pd
class pdf_text_chunker:
def read_pdf(self, file_url: str) -> str:
logger = logging.getLogger("udf_logger")
logger.info(f"Opening file {file_url}")
with SnowflakeFile.open(file_url, 'rb') as f:
buffer = io.BytesIO(f.readall())
reader = PyPDF2.PdfReader(buffer)
text = ""
for page in reader.pages:
try:
text += page.extract_text().replace('\n', ' ').replace('\0', ' ')
except:
text = "Unable to Extract"
logger.warn(f"Unable to extract from file {file_url}, page {page}")
return text
def process(self, file_url: str):
text = self.read_pdf(file_url)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 2000, # Adjust this as needed
chunk_overlap = 300, # Overlap to keep chunks contextual
length_function = len
)
chunks = text_splitter.split_text(text)
df = pd.DataFrame(chunks, columns=['chunk'])
yield from df.itertuples(index=False, name=None)
$$;
Créer ensuite une table pour contenir les données analysées à partir des fichiers PDF.
CREATE OR REPLACE TABLE cortex_search_tutorial_db.public.docs_chunks_table AS
SELECT
relative_path,
build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path) AS file_url,
-- preserve file title information by concatenating relative_path with the chunk
CONCAT(relative_path, ': ', func.chunk) AS chunk,
'English' AS language
FROM
directory(@cortex_search_tutorial_db.public.fomc),
TABLE(cortex_search_tutorial_db.public.pdf_text_chunker(build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path))) AS func;
Étape 4 : Créer un service de recherche¶
Créer un service de recherche sur votre nouvelle table en exécutant la commande SQL suivante :
CREATE OR REPLACE CORTEX SEARCH SERVICE cortex_search_tutorial_db.public.fomc_meeting
ON chunk
ATTRIBUTES language
WAREHOUSE = cortex_search_tutorial_wh
TARGET_LAG = '1 hour'
AS (
SELECT
chunk,
relative_path,
file_url,
language
FROM cortex_search_tutorial_db.public.docs_chunks_table
);
Cette commande spécifie les attributes
, qui sont les colonnes sur lesquelles vous pourrez filtrer les résultats de recherche, ainsi que l’entrepôt et la latence cible. La colonne de recherche est désignée comme chunk
, qui est généré dans la requête source sous la forme d’une concaténation de plusieurs colonnes de texte dans la table de base. Les autres colonnes de la requête source peuvent être incluses en réponse à une demande de recherche.
Étape 5 : Créer une application Streamlit¶
Vous pouvez interroger le service avec le SDK Python (en utilisant le Paquet Python snowflake
). Ce tutoriel montre comment utiliser le SDK Python dans une application Streamlit in Snowflake.
Tout d’abord, assurez-vous que votre rôle global de la Snowsight UI est le même que le rôle utilisé pour créer le service à l’étape de création du service.
Connectez-vous à Snowsight.
Sélectionnez Projects » Streamlit dans le menu de navigation de gauche.
Sélectionnez + Streamlit App.
Important : Sélectionnez la base de données
cortex_search_tutorial_db
et le schémapublic
pour l’emplacement de l’application.Dans le volet gauche de l’éditeur Streamlit in Snowflake, sélectionnez Packages et ajouter
snowflake
(version> = 0.8.0) etsnowflake-ml-python
pour installer les paquets requis dans votre application.Remplacez l’exemple de code d’application par l’application Streamlit suivante :
import streamlit as st from snowflake.core import Root # requires snowflake>=0.8.0 from snowflake.cortex import Complete from snowflake.snowpark.context import get_active_session MODELS = [ "mistral-large", "snowflake-arctic", "llama3-70b", "llama3-8b", ] def init_messages(): """ Initialize the session state for chat messages. If the session state indicates that the conversation should be cleared or if the "messages" key is not in the session state, initialize it as an empty list. """ if st.session_state.clear_conversation or "messages" not in st.session_state: st.session_state.messages = [] def init_service_metadata(): """ Initialize the session state for cortex search service metadata. Query the available cortex search services from the Snowflake session and store their names and search columns in the session state. """ if "service_metadata" not in st.session_state: services = session.sql("SHOW CORTEX SEARCH SERVICES;").collect() service_metadata = [] if services: for s in services: svc_name = s["name"] svc_search_col = session.sql( f"DESC CORTEX SEARCH SERVICE {svc_name};" ).collect()[0]["search_column"] service_metadata.append( {"name": svc_name, "search_column": svc_search_col} ) st.session_state.service_metadata = service_metadata def init_config_options(): """ Initialize the configuration options in the Streamlit sidebar. Allow the user to select a cortex search service, clear the conversation, toggle debug mode, and toggle the use of chat history. Also provide advanced options to select a model, the number of context chunks, and the number of chat messages to use in the chat history. """ st.sidebar.selectbox( "Select cortex search service:", [s["name"] for s in st.session_state.service_metadata], key="selected_cortex_search_service", ) st.sidebar.button("Clear conversation", key="clear_conversation") st.sidebar.toggle("Debug", key="debug", value=False) st.sidebar.toggle("Use chat history", key="use_chat_history", value=True) with st.sidebar.expander("Advanced options"): st.selectbox("Select model:", MODELS, key="model_name") st.number_input( "Select number of context chunks", value=5, key="num_retrieved_chunks", min_value=1, max_value=10, ) st.number_input( "Select number of messages to use in chat history", value=5, key="num_chat_messages", min_value=1, max_value=10, ) st.sidebar.expander("Session State").write(st.session_state) def query_cortex_search_service(query, columns = [], filter={}): """ Query the selected cortex search service with the given query and retrieve context documents. Display the retrieved context documents in the sidebar if debug mode is enabled. Return the context documents as a string. Args: query (str): The query to search the cortex search service with. Returns: str: The concatenated string of context documents. """ db, schema = session.get_current_database(), session.get_current_schema() cortex_search_service = ( root.databases[db] .schemas[schema] .cortex_search_services[st.session_state.selected_cortex_search_service] ) context_documents = cortex_search_service.search( query, columns=columns, filter=filter, limit=st.session_state.num_retrieved_chunks ) results = context_documents.results service_metadata = st.session_state.service_metadata search_col = [s["search_column"] for s in service_metadata if s["name"] == st.session_state.selected_cortex_search_service][0].lower() context_str = "" for i, r in enumerate(results): context_str += f"Context document {i+1}: {r[search_col]} \n" + "\n" if st.session_state.debug: st.sidebar.text_area("Context documents", context_str, height=500) return context_str, results def get_chat_history(): """ Retrieve the chat history from the session state limited to the number of messages specified by the user in the sidebar options. Returns: list: The list of chat messages from the session state. """ start_index = max( 0, len(st.session_state.messages) - st.session_state.num_chat_messages ) return st.session_state.messages[start_index : len(st.session_state.messages) - 1] def complete(model, prompt): """ Generate a completion for the given prompt using the specified model. Args: model (str): The name of the model to use for completion. prompt (str): The prompt to generate a completion for. Returns: str: The generated completion. """ return Complete(model, prompt).replace("$", "\$") def make_chat_history_summary(chat_history, question): """ Generate a summary of the chat history combined with the current question to extend the query context. Use the language model to generate this summary. Args: chat_history (str): The chat history to include in the summary. question (str): The current user question to extend with the chat history. Returns: str: The generated summary of the chat history and question. """ prompt = f""" [INST] Based on the chat history below and the question, generate a query that extend the question with the chat history provided. The query should be in natural language. Answer with only the query. Do not add any explanation. <chat_history> {chat_history} </chat_history> <question> {question} </question> [/INST] """ summary = complete(st.session_state.model_name, prompt) if st.session_state.debug: st.sidebar.text_area( "Chat history summary", summary.replace("$", "\$"), height=150 ) return summary def create_prompt(user_question): """ Create a prompt for the language model by combining the user question with context retrieved from the cortex search service and chat history (if enabled). Format the prompt according to the expected input format of the model. Args: user_question (str): The user's question to generate a prompt for. Returns: str: The generated prompt for the language model. """ if st.session_state.use_chat_history: chat_history = get_chat_history() if chat_history != []: question_summary = make_chat_history_summary(chat_history, user_question) prompt_context, results = query_cortex_search_service( question_summary, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) else: prompt_context, results = query_cortex_search_service( user_question, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) else: prompt_context, results = query_cortex_search_service( user_question, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) chat_history = "" prompt = f""" [INST] You are a helpful AI chat assistant with RAG capabilities. When a user asks you a question, you will also be given context provided between <context> and </context> tags. Use that context with the user's chat history provided in the between <chat_history> and </chat_history> tags to provide a summary that addresses the user's question. Ensure the answer is coherent, concise, and directly relevant to the user's question. If the user asks a generic question which cannot be answered with the given context or chat_history, just say "I don't know the answer to that question. Don't saying things like "according to the provided context". <chat_history> {chat_history} </chat_history> <context> {prompt_context} </context> <question> {user_question} </question> [/INST] Answer: """ return prompt, results def main(): st.title(f":speech_balloon: Chatbot with Snowflake Cortex") init_service_metadata() init_config_options() init_messages() icons = {"assistant": "❄️", "user": "👤"} # Display chat messages from history on app rerun for message in st.session_state.messages: with st.chat_message(message["role"], avatar=icons[message["role"]]): st.markdown(message["content"]) disable_chat = ( "service_metadata" not in st.session_state or len(st.session_state.service_metadata) == 0 ) if question := st.chat_input("Ask a question...", disabled=disable_chat): # Add user message to chat history st.session_state.messages.append({"role": "user", "content": question}) # Display user message in chat message container with st.chat_message("user", avatar=icons["user"]): st.markdown(question.replace("$", "\$")) # Display assistant response in chat message container with st.chat_message("assistant", avatar=icons["assistant"]): message_placeholder = st.empty() question = question.replace("'", "") prompt, results = create_prompt(question) with st.spinner("Thinking..."): generated_response = complete( st.session_state.model_name, prompt ) # build references table for citation markdown_table = "###### References \n\n| PDF Title | URL |\n|-------|-----|\n" for ref in results: markdown_table += f"| {ref['relative_path']} | [Link]({ref['file_url']}) |\n" message_placeholder.markdown(generated_response + "\n\n" + markdown_table) st.session_state.messages.append( {"role": "assistant", "content": generated_response} ) if __name__ == "__main__": session = get_active_session() root = Root(session) main()
Étape 6 : Essayer l’application¶
Dans le volet droit de la fenêtre de l’éditeur Streamlit in Snowflake, vous verrez un aperçu de votre application Streamlit. Cela devrait ressembler à la capture d’écran suivante :

Saisissez une requête dans la zone de texte pour tester votre nouvelle application. Voici quelques exemples de requêtes que vous pouvez essayer :
- Exemple de session 1 : questions-réponses à plusieurs tours
How was gpd growth in q4 23?
How was unemployment in the same quarter?
- Exemple de session 2 : résumer plusieurs documents
How has the fed's view of the market change over the course of 2024?
- Exemple de session 3 : s’abstenir lorsque les documents ne contiennent pas la bonne réponse
What was janet yellen's opinion about 2024 q1?
Étape 7 : Nettoyage¶
Nettoyer (facultatif)¶
Exécutez les commandes DROP <objet> suivantes pour remettre votre système dans son état initial avant de commencer le tutoriel :
DROP DATABASE IF EXISTS cortex_search_tutorial_db;
DROP WAREHOUSE IF EXISTS cortex_search_tutorial_wh;
Détruire la base de données supprime automatiquement toutes les objets de base de données liés, par exemple les tables.
Prochaines étapes¶
Félicitations ! Vous avez réussi à créer une application de recherche à partir d’un ensemble de fichiers PDF dans Snowflake.
Ressources supplémentaires¶
Vous pouvez poursuivre l’entraînement en utilisant les ressources suivantes :