Tutorial 3: Crie um chatbot PDF com o Cortex Search¶
Introdução¶
Este tutorial descreve como criar um chatbot a partir de um conjunto de dados de documentos PDF usando o Cortex Search. No Tutorial 2, você aprendeu como criar um chatbot a partir de dados de texto que já foram extraídos de sua fonte. Este tutorial mostra um exemplo de extração de texto de PDFs usando uma UDF de Python básica e, em seguida, ingerindo os dados extraídos em um Cortex Search Service.
O que você aprenderá¶
Extraia texto de um conjunto de arquivos PDF em um estágio usando uma UDF Python.
Crie um Cortex Search Service a partir do texto extraído.
Crie um aplicativo de bate-papo Streamlit-in-Snowflake que permita que você faça perguntas sobre os dados extraídos dos documentos PDF.
Pré-requisitos¶
Os seguintes pré-requisitos são necessários para concluir este tutorial:
Você tem uma conta Snowflake e um usuário com uma função que concede os privilégios necessários para criar um banco de dados, tabelas, objetos de warehouse virtual, Cortex Search Services e aplicativos Streamlit.
Consulte o Snowflake em 20 minutos para obter instruções para atender a estes requisitos.
Etapa 1: Configuração¶
Como obter dados PDF¶
Você usará um conjunto de dados de amostra das atas das reuniões do Comitê Federal de Mercado Aberto (FOMC) para este tutorial. Esta é um amostra de 12 documentos de 10 páginas com notas de reuniões do FOMC de 2023 e 2024. Baixe os arquivos diretamente do seu navegador seguindo este link:
O conjunto completo de atas do FOMC pode ser encontrado no site do US Federal Reserve
Nota
Em um configuração não tutorial, você traria seus próprios dados, possivelmente já em um estágio Snowflake.
Criação de banco de dados, tabelas e warehouse¶
Execute as seguintes instruções para criar um banco de dados e um warehouse virtual necessários para este tutorial. Depois de concluir o tutorial, você poderá descartar estes objetos.
CREATE DATABASE IF NOT EXISTS cortex_search_tutorial_db;
CREATE OR REPLACE WAREHOUSE cortex_search_tutorial_wh WITH
WAREHOUSE_SIZE='X-SMALL'
AUTO_SUSPEND = 120
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED=TRUE;
USE WAREHOUSE cortex_search_tutorial_wh;
Nota
A instrução
CREATE DATABASE
cria um banco de dados. O banco de dados inclui automaticamente um esquema chamado PUBLIC.A instrução
CREATE WAREHOUSE
cria um warehouse inicialmente suspenso.
Etapa 2: Carregamento de dados para o Snowflake¶
Primeiro crie um estágio Snowflake para armazenar os arquivos que contêm os dados. Esta estágio abrigará os arquivos PDF das atas das reuniões.
CREATE OR REPLACE STAGE cortex_search_tutorial_db.public.fomc
DIRECTORY = (ENABLE = TRUE)
ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
Nota
O diretório e a criptografia são configurados para gerar presigned_url para um arquivo. Se você não precisa gerar presigned_url, pode pular essas configurações.
Agora carregue o conjunto de dados. Você pode carregar o conjunto de dados em Snowsight ou usando SQL. Para carregar em Snowsight:
Faça login no Snowsight.
Selecione Data no menu de navegação do lado esquerdo.
Selecione seu banco de dados
cortex_search_tutorial_db
.Selecione seu esquema
public
.Selecione Stages e
fomc
.No canto superior direito, selecione o botão + Files.
Arraste e solte arquivos na UI ou selecione Browse para escolher um arquivo na janela de diálogo.
Selecione Upload para enviar seu arquivo.
Etapa 3: Análise de arquivos PDF¶
Crie uma função de pré-processamento para fazer o seguinte:
Analisar os arquivos PDF e extrair o texto.
Divida o texto em partes menores para indexação.
CREATE OR REPLACE FUNCTION cortex_search_tutorial_db.public.pdf_text_chunker(file_url STRING)
RETURNS TABLE (chunk VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
HANDLER = 'pdf_text_chunker'
PACKAGES = ('snowflake-snowpark-python', 'PyPDF2', 'langchain')
AS
$$
from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging
import pandas as pd
class pdf_text_chunker:
def read_pdf(self, file_url: str) -> str:
logger = logging.getLogger("udf_logger")
logger.info(f"Opening file {file_url}")
with SnowflakeFile.open(file_url, 'rb') as f:
buffer = io.BytesIO(f.readall())
reader = PyPDF2.PdfReader(buffer)
text = ""
for page in reader.pages:
try:
text += page.extract_text().replace('\n', ' ').replace('\0', ' ')
except:
text = "Unable to Extract"
logger.warn(f"Unable to extract from file {file_url}, page {page}")
return text
def process(self, file_url: str):
text = self.read_pdf(file_url)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 2000, # Adjust this as needed
chunk_overlap = 300, # Overlap to keep chunks contextual
length_function = len
)
chunks = text_splitter.split_text(text)
df = pd.DataFrame(chunks, columns=['chunk'])
yield from df.itertuples(index=False, name=None)
$$;
Em seguida, crie uma tabela para armazenar os dados analisados dos arquivos PDF.
CREATE OR REPLACE TABLE cortex_search_tutorial_db.public.docs_chunks_table AS
SELECT
relative_path,
build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path) AS file_url,
-- preserve file title information by concatenating relative_path with the chunk
CONCAT(relative_path, ': ', func.chunk) AS chunk,
'English' AS language
FROM
directory(@cortex_search_tutorial_db.public.fomc),
TABLE(cortex_search_tutorial_db.public.pdf_text_chunker(build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path))) AS func;
Etapa 4: Criação do serviço de pesquisa¶
Crie um serviço de pesquisa em sua nova tabela executando o seguinte comando SQL:
CREATE OR REPLACE CORTEX SEARCH SERVICE cortex_search_tutorial_db.public.fomc_meeting
ON chunk
ATTRIBUTES language
WAREHOUSE = cortex_search_tutorial_wh
TARGET_LAG = '1 hour'
AS (
SELECT
chunk,
relative_path,
file_url,
language
FROM cortex_search_tutorial_db.public.docs_chunks_table
);
Este comando especifica attributes
, que são as colunas nas quais você poderá filtrar os resultados da pesquisa, bem como o warehouse e a meta de atraso. A coluna de pesquisa é designada como chunk
, que é gerada na consulta de origem como uma concatenação de várias colunas de texto na tabela base. As outras colunas na consulta de origem podem ser incluídas em resposta a uma solicitação de pesquisa.
Etapa 5: Criação de um aplicativo Streamlit¶
Você pode consulta o serviço com Python SDK (usando o pacote Python snowflake
). Este tutorial demonstra o uso do Python SDK em um aplicativo Streamlit in Snowflake.
Primeiro, certifique-se de que sua função de Snowsight UI global seja a mesma que a função usada para criar o serviço na etapa de criação do serviço.
Faça login no Snowsight.
Selecione Projects » Streamlit no menu de navegação do lado esquerdo.
Selecione + Streamlit App.
Importante: selecione o banco de dados
cortex_search_tutorial_db
e o esquemapublic
para o local do aplicativo.No painel esquerdo do editor Streamlit in Snowflake, selecione Packages e adicione
snowflake
(versão >= 0.8.0) esnowflake-ml-python
para instalar os pacotes necessários em seu aplicativo.Substitua o código do aplicativo de exemplo pelo seguinte aplicativo Streamlit:
import streamlit as st from snowflake.core import Root # requires snowflake>=0.8.0 from snowflake.cortex import Complete from snowflake.snowpark.context import get_active_session MODELS = [ "mistral-large", "snowflake-arctic", "llama3-70b", "llama3-8b", ] def init_messages(): """ Initialize the session state for chat messages. If the session state indicates that the conversation should be cleared or if the "messages" key is not in the session state, initialize it as an empty list. """ if st.session_state.clear_conversation or "messages" not in st.session_state: st.session_state.messages = [] def init_service_metadata(): """ Initialize the session state for cortex search service metadata. Query the available cortex search services from the Snowflake session and store their names and search columns in the session state. """ if "service_metadata" not in st.session_state: services = session.sql("SHOW CORTEX SEARCH SERVICES;").collect() service_metadata = [] if services: for s in services: svc_name = s["name"] svc_search_col = session.sql( f"DESC CORTEX SEARCH SERVICE {svc_name};" ).collect()[0]["search_column"] service_metadata.append( {"name": svc_name, "search_column": svc_search_col} ) st.session_state.service_metadata = service_metadata def init_config_options(): """ Initialize the configuration options in the Streamlit sidebar. Allow the user to select a cortex search service, clear the conversation, toggle debug mode, and toggle the use of chat history. Also provide advanced options to select a model, the number of context chunks, and the number of chat messages to use in the chat history. """ st.sidebar.selectbox( "Select cortex search service:", [s["name"] for s in st.session_state.service_metadata], key="selected_cortex_search_service", ) st.sidebar.button("Clear conversation", key="clear_conversation") st.sidebar.toggle("Debug", key="debug", value=False) st.sidebar.toggle("Use chat history", key="use_chat_history", value=True) with st.sidebar.expander("Advanced options"): st.selectbox("Select model:", MODELS, key="model_name") st.number_input( "Select number of context chunks", value=5, key="num_retrieved_chunks", min_value=1, max_value=10, ) st.number_input( "Select number of messages to use in chat history", value=5, key="num_chat_messages", min_value=1, max_value=10, ) st.sidebar.expander("Session State").write(st.session_state) def query_cortex_search_service(query, columns = [], filter={}): """ Query the selected cortex search service with the given query and retrieve context documents. Display the retrieved context documents in the sidebar if debug mode is enabled. Return the context documents as a string. Args: query (str): The query to search the cortex search service with. Returns: str: The concatenated string of context documents. """ db, schema = session.get_current_database(), session.get_current_schema() cortex_search_service = ( root.databases[db] .schemas[schema] .cortex_search_services[st.session_state.selected_cortex_search_service] ) context_documents = cortex_search_service.search( query, columns=columns, filter=filter, limit=st.session_state.num_retrieved_chunks ) results = context_documents.results service_metadata = st.session_state.service_metadata search_col = [s["search_column"] for s in service_metadata if s["name"] == st.session_state.selected_cortex_search_service][0].lower() context_str = "" for i, r in enumerate(results): context_str += f"Context document {i+1}: {r[search_col]} \n" + "\n" if st.session_state.debug: st.sidebar.text_area("Context documents", context_str, height=500) return context_str, results def get_chat_history(): """ Retrieve the chat history from the session state limited to the number of messages specified by the user in the sidebar options. Returns: list: The list of chat messages from the session state. """ start_index = max( 0, len(st.session_state.messages) - st.session_state.num_chat_messages ) return st.session_state.messages[start_index : len(st.session_state.messages) - 1] def complete(model, prompt): """ Generate a completion for the given prompt using the specified model. Args: model (str): The name of the model to use for completion. prompt (str): The prompt to generate a completion for. Returns: str: The generated completion. """ return Complete(model, prompt).replace("$", "\$") def make_chat_history_summary(chat_history, question): """ Generate a summary of the chat history combined with the current question to extend the query context. Use the language model to generate this summary. Args: chat_history (str): The chat history to include in the summary. question (str): The current user question to extend with the chat history. Returns: str: The generated summary of the chat history and question. """ prompt = f""" [INST] Based on the chat history below and the question, generate a query that extend the question with the chat history provided. The query should be in natural language. Answer with only the query. Do not add any explanation. <chat_history> {chat_history} </chat_history> <question> {question} </question> [/INST] """ summary = complete(st.session_state.model_name, prompt) if st.session_state.debug: st.sidebar.text_area( "Chat history summary", summary.replace("$", "\$"), height=150 ) return summary def create_prompt(user_question): """ Create a prompt for the language model by combining the user question with context retrieved from the cortex search service and chat history (if enabled). Format the prompt according to the expected input format of the model. Args: user_question (str): The user's question to generate a prompt for. Returns: str: The generated prompt for the language model. """ if st.session_state.use_chat_history: chat_history = get_chat_history() if chat_history != []: question_summary = make_chat_history_summary(chat_history, user_question) prompt_context, results = query_cortex_search_service( question_summary, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) else: prompt_context, results = query_cortex_search_service( user_question, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) else: prompt_context, results = query_cortex_search_service( user_question, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) chat_history = "" prompt = f""" [INST] You are a helpful AI chat assistant with RAG capabilities. When a user asks you a question, you will also be given context provided between <context> and </context> tags. Use that context with the user's chat history provided in the between <chat_history> and </chat_history> tags to provide a summary that addresses the user's question. Ensure the answer is coherent, concise, and directly relevant to the user's question. If the user asks a generic question which cannot be answered with the given context or chat_history, just say "I don't know the answer to that question. Don't saying things like "according to the provided context". <chat_history> {chat_history} </chat_history> <context> {prompt_context} </context> <question> {user_question} </question> [/INST] Answer: """ return prompt, results def main(): st.title(f":speech_balloon: Chatbot with Snowflake Cortex") init_service_metadata() init_config_options() init_messages() icons = {"assistant": "❄️", "user": "👤"} # Display chat messages from history on app rerun for message in st.session_state.messages: with st.chat_message(message["role"], avatar=icons[message["role"]]): st.markdown(message["content"]) disable_chat = ( "service_metadata" not in st.session_state or len(st.session_state.service_metadata) == 0 ) if question := st.chat_input("Ask a question...", disabled=disable_chat): # Add user message to chat history st.session_state.messages.append({"role": "user", "content": question}) # Display user message in chat message container with st.chat_message("user", avatar=icons["user"]): st.markdown(question.replace("$", "\$")) # Display assistant response in chat message container with st.chat_message("assistant", avatar=icons["assistant"]): message_placeholder = st.empty() question = question.replace("'", "") prompt, results = create_prompt(question) with st.spinner("Thinking..."): generated_response = complete( st.session_state.model_name, prompt ) # build references table for citation markdown_table = "###### References \n\n| PDF Title | URL |\n|-------|-----|\n" for ref in results: markdown_table += f"| {ref['relative_path']} | [Link]({ref['file_url']}) |\n" message_placeholder.markdown(generated_response + "\n\n" + markdown_table) st.session_state.messages.append( {"role": "assistant", "content": generated_response} ) if __name__ == "__main__": session = get_active_session() root = Root(session) main()
Etapa 6: Teste do aplicativo¶
No painel direito da janela do editor Streamlit in Snowflake, você verá uma versão do seu aplicativo Streamlit. Deve ser semelhante à captura de tela a seguir:

Insira uma consulta na caixa de texto para testar seu novo aplicativo. Alguns amostra de consultas que você pode tentar são:
- Exemplo de sessão 1: perguntas e respostas em várias etapas
How was gpd growth in q4 23?
How was unemployment in the same quarter?
- Exemplo de sessão 2: resumindo vários documentos
How has the fed's view of the market change over the course of 2024?
- Exemplo de sessão 3: abstenção quando os documentos não contêm a resposta correta
What was janet yellen's opinion about 2024 q1?
Etapa 7: limpeza¶
Limpeza (opcional)¶
Execute os seguintes comandos DROP <objeto> para retornar o sistema ao seu estado antes de iniciar o tutorial:
DROP DATABASE IF EXISTS cortex_search_tutorial_db;
DROP WAREHOUSE IF EXISTS cortex_search_tutorial_wh;
Descartar o banco de dados remove automaticamente todos os objetos do banco de dados filho, tais como tabelas.
Próximos passos¶
Parabéns! Você criou com sucesso um aplicativo de pesquisa a partir de um conjunto de arquivos PDF no Snowflake.
Recursos adicionais¶
Você pode continuar aprendendo usando os seguintes recursos: