Cette rubrique décrit comment diffuser des réponses en temps réel à partir du SDK Agent Cortex Code.
Par défaut, leSDK rend des objets``AssistantMessage`` complets une fois que le modèle a fini de générer chaque réponse. Pour recevoir des mises à jour incrémentielles à mesure que du texte et des blocs de réflexion sont générés, activez le streaming partiel des messages en définissant includePartialMessages (TypeScript) ou include_partial_messages (Python) sur``true``.
Lorsque des messages partiels sont activés, Cortex Code émet des objets StreamEvent pour le texte partiel et le contenu de la réflexion. Les appels complets des outils arrivent toujours en tant qu’objets AssistantMessage et les résultats des outils arrivent toujours sous la forme d’objets UserMessage.
Lorsqu’il est activé, le SDK rend des messages StreamEvent contenant des événements de streaming partiels, en plus des objets AssistantMessage,``UserMessage``, et ResultMessage habituels. Votre code doit :
Vérifier le type de chaque message pour différencier StreamEvent des autres types.
Pour StreamEvent, extraire le champ event et vérifier son type.
Chercher des événements content_block_delta où``delta.type`` est text_delta.
import{query}from"cortex-code-agent-sdk";forawait(constmessageofquery({prompt:"List the files in my project",options:{cwd:process.cwd(),includePartialMessages:true,allowedTools:["Bash","Read"],},})){if(message.type==="stream_event"){constevent=message.event;if(event.type==="content_block_delta"){if(event.delta.type==="text_delta"){process.stdout.write(event.delta.text);}}}}
importasynciofromcortex_code_agent_sdkimportquery,CortexCodeAgentOptionsfromcortex_code_agent_sdk.typesimportStreamEventasyncdefstream_response():asyncformessageinquery(prompt="List the files in my project",options=CortexCodeAgentOptions(cwd=".",include_partial_messages=True,allowed_tools=["Bash","Read"],),):ifisinstance(message,StreamEvent):event=message.eventifevent.get("type")=="content_block_delta":delta=event.get("delta",{})ifdelta.get("type")=="text_delta":print(delta.get("text",""),end="",flush=True)asyncio.run(stream_response())
Lorsque les messages partiels sont activés, vous recevez des événements de streaming bruts encapsulés dans un objet :
interfaceSDKPartialAssistantMessage{type:"stream_event";event:Record<string,unknown>;// Raw streaming eventparent_tool_use_id:string|null;uuid:string;session_id:string;}
@dataclassclassStreamEvent:uuid:str# Unique identifiersession_id:str# Session identifierevent:dict[str,Any]# Raw streaming eventparent_tool_use_id:str|None# Parent tool ID if from a subagent
Le champ event contient l’événement de streaming partiel brut émis par Cortex Code. Types d’événements courants :
Type d’événement.
Description
content_block_start
Début d’un nouveau texte ou d’un bloc de réflexion
content_block_delta
Mise à jour du texte incrémentiel ou de la réflexion
Lorsque les messages partiels sont activés, vous recevez généralement les messages dans l’ordre suivant :
SystemMessage -- session initialization
StreamEvent (content_block_start) -- text or thinking block
StreamEvent (content_block_delta) -- text_delta or thinking_delta chunks...
StreamEvent (content_block_stop)
AssistantMessage -- complete text/thinking block, or complete tool_use block
UserMessage -- complete tool_result block
... more assistant/user turns ...
ResultMessage -- final result
Si les messages partiels ne sont pas activés, vous recevez toujours les mêmes messages d’assistant, d’utilisateur et de résultat, mais pas StreamEvent. Selon la session, le SDK peut également émettre des événements system tels que l’initialisation, l’état et les notifications de tâches en arrière-plan.
Pour afficher le texte tel qu’il est généré, recherchez des événements content_block_delta où delta.type est text_delta :
import{query}from"cortex-code-agent-sdk";forawait(constmessageofquery({prompt:"Explain how databases work",options:{cwd:process.cwd(),includePartialMessages:true},})){if(message.type==="stream_event"){constevent=message.event;if(event.type==="content_block_delta"&&event.delta.type==="text_delta"){process.stdout.write(event.delta.text);}}}console.log();// Final newline
importasynciofromcortex_code_agent_sdkimportquery,CortexCodeAgentOptionsfromcortex_code_agent_sdk.typesimportStreamEventasyncdefstream_text():asyncformessageinquery(prompt="Explain how databases work",options=CortexCodeAgentOptions(cwd=".",include_partial_messages=True),):ifisinstance(message,StreamEvent):event=message.eventifevent.get("type")=="content_block_delta":delta=event.get("delta",{})ifdelta.get("type")=="text_delta":print(delta.get("text",""),end="",flush=True)print()# Final newlineasyncio.run(stream_text())
L’exemple suivant accumule le texte ave streaming dans un tampon local et rend à nouveau la réponse actuelle chaque fois qu’un nouveau text_delta arrive. Dans une application réelle, remplacez la fonction render par la logique de mise à jour de l’état de votre framework :
import{query}from"cortex-code-agent-sdk";letcurrentText="";functionrender(text:string){console.clear();console.log("Assistant:\n");process.stdout.write(text);}forawait(constmessageofquery({prompt:"Explain how databases work",options:{cwd:process.cwd(),includePartialMessages:true,},})){if(message.type==="stream_event"){constevent=message.event;if(event.type==="content_block_delta"&&event.delta.type==="text_delta"){currentText+=event.delta.text;render(currentText);}}elseif(message.type==="result"){console.log("\n\n--- Complete ---");}}
importasyncioimportsysfromcortex_code_agent_sdkimportquery,CortexCodeAgentOptions,ResultMessagefromcortex_code_agent_sdk.typesimportStreamEventdefrender(text:str)->None:sys.stdout.write("\033[2J\033[H")sys.stdout.write("Assistant:\n\n")sys.stdout.write(text)sys.stdout.flush()asyncdefstreaming_ui():current_text=""asyncformessageinquery(prompt="Explain how databases work",options=CortexCodeAgentOptions(cwd=".",include_partial_messages=True,),):ifisinstance(message,StreamEvent):event=message.eventifevent.get("type")=="content_block_delta":delta=event.get("delta",{})ifdelta.get("type")=="text_delta":current_text+=delta.get("text","")render(current_text)elifisinstance(message,ResultMessage):print("\n\n--- Complete ---")asyncio.run(streaming_ui())