Tutoriel 2 : Interfaces du consommateur avec une CKE dans un chatbot Streamlit

Introduction

Dans ce tutoriel, vous configurerez un pipeline personnalisé de génération augmentée par récupération (RAG) pour intégrer les connaissances d’une Cortex Knowledge Extension dans un chatbot.

Voici comment cela fonctionne :

  1. Une application Streamlit accepte une demande de la part d’un utilisateur.

  2. Le prompt est donné à l’API de Cortex Search Query avec la Cortex Knowledge Extension / le Cortex Search Service configuré.

  3. L’application Streamlit récupère les documents, les place dans la fenêtre contextuelle avec un prompt personnalisé et les envoie à la fonction Cortex LLM Complete avec un LLM spécifié.

Note

Ce tutoriel part du principe que vous disposez déjà d’un CKE. Allez sur Snowflake Marketplace et accédez à l’un d’eux, ou utilisez le Tutoriel 1 pour en créer un.

Un organigramme montrant le flux de travail de la CKE, depuis le Cortex Search Service d'un fournisseur jusqu'à un index de recherche, en passant par une réponse dans le prompt d'un consommateur.

Étape 1. Configurer votre environnement

L’exemple ci-dessous configure un environnement et crée une application Streamlit que vous pouvez exécuter dans Snowflake pour tester une Cortex Knowledge Extension. Cela suppose que le consommateur a accès à une Cortex Knowledge Extension partagée par un fournisseur.

  1. Connectez-vous à Snowsight.

  2. Dans la barre de navigation de gauche, sélectionnez Projects » Streamlit.

  3. Sélectionnez + Streamlit App.

    La fenêtre Create Streamlit App s’ouvre.

  4. Saisissez un nom pour votre application.

  5. Dans la liste déroulante App location sélectionnez la base de données et le schéma de votre application.

  6. Dans la liste déroulante Warehouse sélectionnez l’entrepôt dans lequel vous souhaitez exécuter votre application et les requêtes.

  7. Sélectionnez Create.

    L’éditeur Streamlit dans Snowflake ouvre un exemple d’application Streamlit en mode visualisation. Le mode visualisation vous permet de voir comment l’application Streamlit apparaît yeux des utilisateurs.

  8. Vérifiez que les bons paquets et les bonnes versions sont installés comme dans l’image ci-dessous.

Une capture d'écran montrant les paquets dont l'installation est exigée pour utiliser le CKE.

Étape 2 : Créer une application Streamlit pour votre testeur de chat CKE

Le code ci-dessous est une simple application Streamlit qui vous permet de tester le CKE. L’application utilise le paquet Snowflake ML Python pour appeler la Cortex Knowledge Extension et la fonction Snowflake LLM Complete. L’application vous permet de sélectionner une Cortex Knowledge Extension, de saisir une question et de recevoir une réponse du LLM. Elle propose également des options de débogage et d’utilisation de l’historique du chat.

  1. Dans la barre de navigation de gauche, sélectionnez Projects » Streamlit.

  2. Sélectionnez l’application Streamlit que vous avez créée à l’étape précédente.

  3. Dans l’éditeur Streamlit dans Snowflake, sélectionnez Edit » Edit code.

    L’éditeur Streamlit dans Snowflake s’ouvre en mode édition.

  4. Dans la barre de navigation de gauche, sélectionnez streamlit_app.py pour ouvrir l’éditeur de code.

  5. Dans l’éditeur de code, supprimez le code existant.

  6. Copiez le code ci-dessous et collez-le dans l’éditeur de code, puis sélectionnez Save » Save and run.

    L’éditeur Streamlit dans Snowflake exécute l’application et l’ouvre en mode visualisation.

import streamlit as st
from snowflake.core import Root
from snowflake.cortex import Complete
from snowflake.snowpark.context import get_active_session

MODELS = [
    "llama3.1-8b",
    "llama3.1-70b",
    "llama3.1-405b"
]

def init_messages():
    """Initialize session state messages if not present or if we need to clear."""
    if st.session_state.get("clear_conversation") or "messages" not in st.session_state:
        st.session_state.messages = []
        st.session_state.clear_conversation = False

def init_service_metadata():
    """Load or refresh cortex search services from Snowflake."""
    services = session.sql("SHOW CORTEX SEARCH SERVICES IN ACCOUNT;").collect()
    service_metadata = []
    if services:
        for s in services:
            svc_name = s["name"]
            svc_schema = s["schema_name"]
            svc_db = s["database_name"]
            svc_search_col = session.sql(
                f"DESC CORTEX SEARCH SERVICE {svc_db}.{svc_schema}.{svc_name};"
            ).collect()[0]["search_column"]
            service_metadata.append(
                {
                    "name": svc_name,
                    "search_column": svc_search_col,
                    "db": svc_db,
                    "schema": svc_schema,
                }
            )

    st.session_state.service_metadata = service_metadata

    # Initialize selected_cortex_search_service if it doesn't exist
    if "selected_cortex_search_service" not in st.session_state and service_metadata:
        st.session_state.selected_cortex_search_service = service_metadata[0]["name"]

    selected_entry = st.session_state.get("selected_cortex_search_service")

    if selected_entry:
        # Find matching service metadata
        selected_service_metadata = next(
            (svc for svc in st.session_state.service_metadata if svc["name"] == selected_entry),
            None
        )

        if selected_service_metadata:
            # Store them in session_state
            st.session_state.selected_schema = selected_service_metadata["schema"]
            st.session_state.selected_db = selected_service_metadata["db"]
        elif st.session_state.get("debug", False):
            st.write("No matching service found for:", selected_entry)

def init_config_options():
    if "service_metadata" not in st.session_state or not st.session_state.service_metadata:
        st.sidebar.warning("No Cortex Knowledge Extensions available")
        return

    st.sidebar.selectbox(
        "Select Cortex Knowledge Extension",
        [s["name"] for s in st.session_state.service_metadata],
        key="selected_cortex_search_service",
    )
    if st.sidebar.button("Clear conversation"):
        st.session_state.clear_conversation = True

    # If st.sidebar.toggle isn't available, use st.sidebar.checkbox:
    st.sidebar.checkbox("Debug", key="debug", value=False)
    st.sidebar.checkbox("Use chat history", key="use_chat_history", value=True)

    with st.sidebar.expander("Advanced options"):
        st.selectbox("Select model:", MODELS, key="model_name")
        st.number_input(
            "Select number of context chunks",
            value=5,
            key="num_retrieved_chunks",
            min_value=1,
            max_value=10,
        )
        st.number_input(
            "Select number of messages to use in chat history",
            value=5,
            key="num_chat_messages",
            min_value=1,
            max_value=10,
        )

    st.sidebar.expander("Session State").write(st.session_state)

def get_chat_history():
    """Get the last N messages from session state."""
    start_index = max(
        0, len(st.session_state.messages) - st.session_state.num_chat_messages
    )
    return st.session_state.messages[start_index : len(st.session_state.messages) - 1]

def complete(model, prompt):
    """Use the chosen Snowflake cortex model to complete a prompt."""
    return Complete(model=model, prompt=prompt).replace("$", "\\$")

def make_chat_history_summary(chat_history, question):
    """
    Summarize the chat history plus the question using your LLM,
    to refine the final search query.
    """
    prompt = f"""
    [INST]
    Based on the chat history below and the question, generate a query that extend the question
    with the chat history provided. The query should be in natural language.
    Answer with only the query. Do not add any explanation.

    <chat_history>
    {chat_history}
    </chat_history>
    <question>
    {question}
    </question>
    [/INST]
    """
    summary = complete(st.session_state.model_name, prompt)
    if st.session_state.debug:
        st.sidebar.text_area("Chat history summary", summary.replace("$", "\\$"), height=150)
    return summary

def query_cortex_search_service(query, columns=[], filter={}):
    """
    Query the selected cortex search service with the given query and retrieve context documents.
    """
    # Safely retrieve from session_state
    db = st.session_state.get("selected_db")
    schema = st.session_state.get("selected_schema")

    if st.session_state.get("debug", False):
        st.sidebar.write("Query:", query)
        st.sidebar.write("DB:", db)
        st.sidebar.write("Schema:", schema)
        st.sidebar.write("Service:", st.session_state.selected_cortex_search_service)

    cortex_search_service = (
        root.databases[db]
        .schemas[schema]
        .cortex_search_services[st.session_state.selected_cortex_search_service]
    )

    context_documents = cortex_search_service.search(
        query,
        columns=columns,
        filter=filter,
        limit=st.session_state.num_retrieved_chunks
    )

    results = context_documents.results

    if st.session_state.get("debug", False):
        st.sidebar.write("Search Results:", results)

    service_metadata = st.session_state.service_metadata
    search_col = [
        s["search_column"] for s in service_metadata
        if s["name"] == st.session_state.selected_cortex_search_service
    ][0].lower()

    # Build a context string for the prompt
    context_str = ""
    context_str_template = (
        "Source: {source_url}\n"
        "Source ID: {id}\n"
        "Excerpt: {chunk}\n\n\n"
    )
    for i, r in enumerate(results):
        context_str += context_str_template.format(
            id=i+1,
            chunk=r[search_col],
            source_url=r["source_url"],
            title=r["document_title"],
        )
    if st.session_state.debug:
        st.sidebar.text_area("Context documents", context_str, height=500)

    return context_str, results

def create_prompt(user_question):
    """
    Combine user question, context from the search service, and chat history
    to create a final prompt for the LLM.
    """
    if st.session_state.use_chat_history:
        chat_history = get_chat_history()
        if chat_history != []:
            question_summary = make_chat_history_summary(chat_history, user_question)
            prompt_context, results = query_cortex_search_service(
                question_summary, columns=["chunk", "source_url", "document_title"]
            )
        else:
            prompt_context, results = query_cortex_search_service(
                user_question, columns=["chunk", "source_url", "document_title"]
            )
    else:
        prompt_context, results = query_cortex_search_service(
            user_question, columns=["chunk", "source_url", "document_title"]
        )
        chat_history = ""

    prompt = f"""
You are a helpful AI assistant with RAG capabilities. When a user asks you a question, you will also be given excerpts from relevant documentation to help answer the question accurately. Please use the context provided and cite your sources using the citation format provided.

Context from documentation:
{prompt_context}

User question:
{user_question}

OUTPUT:
"""

    # Add prompt to debug window
    if st.session_state.get("debug", False):
        st.sidebar.text_area("Complete Prompt", prompt, height=300)

    return prompt, results

def post_process_citations(generated_response, results):
    """
    Replace {{.StartCitation}}X{{.EndCitation}} with bracketed references to actual product links.

    NOTE: If the model references chunks out of range (like 4 if only 2 exist),
    consider adding logic to remap or drop invalid references.
    """
    used_results = set()
    for i, ref in enumerate(results):
        old_str = f"{{.StartCitation}}{i+1}{{.EndCitation}}"
        replacement = f"[{i+1}]{ref['source_url']})"
        new_resp = generated_response.replace(old_str, replacement)
        if new_resp != generated_response:
            used_results.add(i)
        generated_response = new_resp
    return generated_response, used_results

# ------------------------------------------------------------------------------
# (2) Main Application (with improved UI)
# ------------------------------------------------------------------------------

def main():
    # Optional: wide layout, custom page title
    st.set_page_config(
        page_title="Cortex Knowledge Extension Chat Tester",
        layout="wide",
    )

    # Optional: a bit of custom CSS for bubble spacing
    custom_css = """
    <style>
    [data-testid="stChatMessage"] {
        border-radius: 8px;
        margin-bottom: 1rem;
        padding: 10px;
    }
    </style>
    """
    st.markdown(custom_css, unsafe_allow_html=True)

    # Title or subheader for your app
    st.subheader("Cortex Knowledge Extension Chat Tester")

    # Initialize metadata and config
    init_service_metadata()
    init_config_options()
    init_messages()

    # Icons for user/assistant
    icons = {"assistant": "❄️", "user": "👤"}

    # Display chat messages from history on app rerun
    for message in st.session_state.messages:
        with st.chat_message(message["role"], avatar=icons[message["role"]]):
            st.markdown(message["content"])

    # If there are no services, disable chat
    disable_chat = (
        "service_metadata" not in st.session_state
        or len(st.session_state.service_metadata) == 0
    )

    # Chat input
    if question := st.chat_input("Ask a question...", disabled=disable_chat):
        # 1. Store user message
        st.session_state.messages.append({"role": "user", "content": question})

        # 2. Display user bubble
        with st.chat_message("user", avatar=icons["user"]):
            st.markdown(question.replace("$", "\\$"))

        # 3. Prepare assistant response
        with st.chat_message("assistant", avatar=icons["assistant"]):
            message_placeholder = st.empty()

            # Clean the question
            question_safe = question.replace("'", "")

            # Build prompt and retrieve docs
            prompt, results = create_prompt(question_safe)

            with st.spinner("Thinking..."):
                generated_response = complete(st.session_state.model_name, prompt)

                # Post-process citations
                post_processed_response, used_results = post_process_citations(generated_response, results)

                # Build references table (only if there are results)
                if results:
                    markdown_table = "\n\n###### References \n\n| Index | Title | Source |\n|------|-------|--------|\n"
                    for i, ref in enumerate(results):
                        # Include all references that were found
                        markdown_table += (
                            f"| {i+1} | {ref.get('document_title', 'N/A')} | "
                            f"{ref.get('source_url', 'N/A')} |\n"
                        )
                else:
                    markdown_table = "\n\n*No references found*"

                # Show final assistant message (with references)
                message_placeholder.markdown(post_processed_response + markdown_table)

        # 4. Append final assistant message to chat history
        st.session_state.messages.append(
            {"role": "assistant", "content": post_processed_response + markdown_table}
        )

# ------------------------------------------------------------------------------
# (3) Entry Point
# ------------------------------------------------------------------------------
if __name__ == "__main__":
    session = get_active_session()
    root = Root(session)
    main()
Copy

Étape 3 : Tester l’application

  1. Cliquez sur Run pour lancer l’application Streamlit.

  2. Sélectionnez un CKE dans le menu déroulant du volet gauche sous Select Cortex Knowledge Extension.`

  3. Posez une question dans la zone de texte du chat.

Une capture d'écran montrant la zone de texte du chat « Posez une question ».