LlamaIndex 中的多智能体模式#
当需要多个专家协同完成任务时,LlamaIndex 提供多种选择方案,每种方案都在便利性和灵活性之间进行权衡。本文介绍三种最常见模式的使用场景选择,并为每种方法提供最小化代码示例。
- AgentWorkflow(内置模式) – 声明一组智能体,由
AgentWorkflow
管理控制权转移。章节 完整 Notebook - 协调器模式(内置模式) – 由"协调器"智能体选择调用哪个子智能体;这些子智能体作为工具暴露给协调器。章节 完整 Notebook
- 自定义规划器(DIY模式) – 自行编写LLM提示(通常为XML/JSON格式)来规划执行序列,并在代码中显式调用智能体。章节 完整 Notebook
模式1 – AgentWorkflow(线性"群"模式)#
适用场景 – 希望几乎无需额外代码即可获得开箱即用的多智能体行为,且满足于 AgentWorkflow
默认的控制转移启发式规则。
AgentWorkflow
本身是一个预配置好的工作流,专为理解智能体、状态和工具调用而设计。您只需提供一组智能体数组并指定起始智能体,它将:
- 将用户消息传递给根智能体
- 执行该智能体选择的任何工具
- 允许智能体在决策时将控制权"移交"给另一个智能体
- 重复上述过程直到有智能体返回最终答案
注意: 当前活跃智能体可随时选择将控制权交还给用户。
以下是多智能体报告生成示例的浓缩版。三个智能体协作完成研究、撰写和审阅报告的工作。(…
表示为简洁省略的代码)
from llama_index.core.agent.workflow import AgentWorkflow, FunctionAgent
# --- 创建专业智能体 ------------------------------------------------
research_agent = FunctionAgent(
name="ResearchAgent",
description="Search the web and record notes.",
system_prompt="You are a researcher… hand off to WriteAgent when ready.",
llm=llm,
tools=[search_web, record_notes],
can_handoff_to=["WriteAgent"],
)
write_agent = FunctionAgent(
name="WriteAgent",
description="Writes a markdown report from the notes.",
system_prompt="You are a writer… ask ReviewAgent for feedback when done.",
llm=llm,
tools=[write_report],
can_handoff_to=["ReviewAgent", "ResearchAgent"],
)
review_agent = FunctionAgent(
name="ReviewAgent",
description="Reviews a report and gives feedback.",
system_prompt="You are a reviewer…", # etc.
llm=llm,
tools=[review_report],
can_handoff_to=["WriteAgent"],
)
# --- 组合智能体 ----------------------------------------------------------
agent_workflow = AgentWorkflow(
agents=[research_agent, write_agent, review_agent],
root_agent=research_agent.name,
initial_state={
"research_notes": {},
"report_content": "Not written yet.",
"review": "Review required.",
},
)
resp = await agent_workflow.run(
user_msg="Write me a report on the history of the web …"
)
print(resp)
AgentWorkflow
会处理所有协调工作,并实时推送事件流以便您向用户展示进度。
模式2 – 协调器智能体(子智能体作为工具)#
适用场景 – 需要一个集中决策点来控制每个步骤以便注入自定义逻辑,但仍希望保持声明式的智能体即工具体验而非自行编写规划器。
在此模式中,您仍需构建专业智能体(ResearchAgent
、WriteAgent
、ReviewAgent
),但不让它们相互移交控制权。而是将每个智能体的 run
方法作为工具暴露出来,并将这些工具提供给一个新的顶层智能体——协调器。
完整示例参见agents_as_tools notebook。
import re
from llama_index.core.agent.workflow import FunctionAgent
from llama_index.core.workflow import Context
# 假设 research_agent / write_agent / review_agent 已按之前方式定义
# 但我们实际上只需要最基本的 `search_web` 工具
async def call_research_agent(ctx: Context, prompt: str) -> str:
"""用于根据特定提示记录研究笔记"""
result = await research_agent.run(
user_msg=f"Write some notes about the following: {prompt}"
)
state = await ctx.store.get("state")
state["research_notes"].append(str(result))
await ctx.store.set("state", state)
return str(result)
async def call_write_agent(ctx: Context) -> str:
"""用于根据研究笔记撰写报告或根据反馈修改报告"""
state = await ctx.store.get("state")
notes = state.get("research_notes", None)
if not notes:
return "No research notes to write from."
user_msg = f"Write a markdown report from the following notes. Be sure to output the report in the following format: <report>...</report>:\n\n"
# 如果存在反馈则添加到用户消息中
feedback = state.get("review", None)
if feedback:
user_msg += f"<feedback>{feedback}</feedback>\n\n"
# 将研究笔记添加到用户消息
notes = "\n\n".join(notes)
user_msg += f"<research_notes>{notes}</research_notes>\n\n"
# 运行写作智能体
result = await write_agent.run(user_msg=user_msg)
report = re.search(r"<report>(.*)</report>", str(result), re.DOTALL).group(
1
)
state["report_content"] = str(report)
await ctx.store.set("state", state)
return str(report)
async def call_review_agent(ctx: Context) -> str:
"""用于审阅报告并提供反馈"""
state = await ctx.store.get("state")
report = state.get("report_content", None)
if not report:
return "No report content to review."
result = await review_agent.run(
user_msg=f"Review the following report: {report}"
)
state["review"] = result
await ctx.store.set("state", state)
return result
orchestrator = FunctionAgent(
system_prompt=(
"You are an expert in the field of report writing. "
"You are given a user request and a list of tools that can help with the request. "
"You are to orchestrate the tools to research, write, and review a report on the given topic. "
"Once the review is positive, you should notify the user that the report is ready to be accessed."
),
llm=orchestrator_llm,
tools=[
call_research_agent,
call_write_agent,
call_review_agent,
],
initial_state={
"research_notes": [],
"report_content": None,
"review": None,
},
)
response = await orchestrator.run(
user_msg="Write me a report on the history of the web …"
)
print(response)
由于协调器本身也是 FunctionAgent
,您可免费获得流式传输、工具调用和状态管理功能,同时保持对智能体调用方式和整体控制流的完全掌控(工具总是返回协调器)。
模式3 – 自定义规划器(DIY提示+解析)#
适用场景 – 需要终极灵活性。当您需要强制使用特定计划格式、与外部调度器集成或收集前两种模式无法开箱即提供的额外元数据时。
此模式的核心思想是编写提示词指导LLM输出结构化计划(XML/JSON/YAML)。您的Python代码解析该计划并显式执行。下级智能体可以是任何形式——FunctionAgent
、RAG流程或其他服务。
以下是工作流的最小化示例,能够规划、执行计划并判断是否需要后续步骤。完整示例参见custom_multi_agent notebook。
import re
import xml.etree.ElementTree as ET
from pydantic import BaseModel, Field
from typing import Any, Optional
from llama_index.core.llms import ChatMessage
from llama_index.core.workflow import (
Context,
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
# 假设我们已创建调用智能体的辅助函数
PLANNER_PROMPT = """You are a planner chatbot.
Given a user request and the current state, break the solution into ordered <step> blocks. Each step must specify the agent to call and the message to send, e.g.
<plan>
<step agent=\"ResearchAgent\">search for …</step>
<step agent=\"WriteAgent\">draft a report …</step>
...
</plan>
<state>
{state}
</state>
<available_agents>
{available_agents}
</available_agents>
The general flow should be:
- Record research notes
- Write a report
- Review the report
- Write the report again if the review is not positive enough
If the user request does not require any steps, you can skip the <plan> block and respond directly.
"""
class InputEvent(StartEvent):
user_msg: Optional[str] = Field(default=None)
chat_history: list[ChatMessage]
state: Optional[dict[str, Any]] = Field(default=None)
class OutputEvent(StopEvent):
response: str
chat_history: list[ChatMessage]
state: dict[str, Any]
class StreamEvent(Event):
delta: str
class PlanEvent(Event):
step_info: str
# 计划模型
class PlanStep(BaseModel):
agent_name: str
agent_input: str
class Plan(BaseModel):
steps: list[PlanStep]
class ExecuteEvent(Event):
plan: Plan
chat_history: list[ChatMessage]
class PlannerWorkflow(Workflow):
llm: OpenAI = OpenAI(
model="o3-mini",
api_key="sk-proj-...",
)
agents: dict[str, FunctionAgent] = {
"ResearchAgent": research_agent,
"WriteAgent": write_agent,
"ReviewAgent": review_agent,
}
@step
async def plan(
self, ctx: Context, ev: InputEvent
) -> ExecuteEvent | OutputEvent:
# 如果存在初始状态则设置
if ev.state:
await ctx.store.set("state", ev.state)
chat_history = ev.chat_history
if ev.user_msg:
user_msg = ChatMessage(
role="user",
content=ev.user_msg,
)
chat_history.append(user_msg)
# 注入包含状态和可用智能体的系统提示
state = await ctx.store.get("state")
available_agents_str = "\n".join(
[
f'<agent name="{agent.name}">{agent.description}</agent>'
for agent in self.agents.values()
]
)
system_prompt = ChatMessage(
role="system",
content=PLANNER_PROMPT.format(
state=str(state),
available_agents=available_agents_str,
),
)
# 从llm流式获取响应
response = await self.llm.astream_chat(
messages=[system_prompt] + chat_history,
)
full_response = ""
async for chunk in response:
full_response += chunk.delta or ""
if chunk.delta:
ctx.write_event_to_stream(
StreamEvent(delta=chunk.delta),
)
# 将响应解析为计划并决定执行或输出
xml_match = re.search(r"(<plan>.*</plan>)", full_response, re.DOTALL)
if not xml_match:
chat_history.append(
ChatMessage(
role="assistant",
content=full_response,
)
)
return OutputEvent(
response=full_response,
chat_history=chat_history,
state=state,
)
else:
xml_str = xml_match.group(1)
root = ET.fromstring(xml_str)
plan = Plan(steps=[])
for step in root.findall("step"):
plan.steps.append(
PlanStep(
agent_name=step.attrib["agent"],
agent_input=step.text.strip() if step.text else "",
)
)
return ExecuteEvent(plan=plan, chat_history=chat_history)
@step
async def execute(self, ctx: Context, ev: ExecuteEvent) -> InputEvent:
chat_history = ev.chat_history
plan = ev.plan
for step in plan.steps:
agent = self.agents[step.agent_name]
agent_input = step.agent_input
ctx.write_event_to_stream(
PlanEvent(
step_info=f'<step agent="{step.agent_name}">{step.agent_input}</step>'
),
)
if step.agent_name == "ResearchAgent":
await call_research_agent(ctx, agent_input)
elif step.agent_name == "WriteAgent":
# 注意:我们不传递计划中的输入
# 而是使用状态驱动写作智能体
await call_write_agent(ctx)
elif step.agent_name == "ReviewAgent":
await call_review_agent(ctx)
state = await ctx.store.get("state")
chat_history.append(
ChatMessage(
role="user",
content=f"I've completed the previous steps, here's the updated state:\n\n<state>\n{state}\n</state>\n\nDo you need to continue and plan more steps?, If not, write a final response.",
)
)
return InputEvent(
chat_history=chat_history,
)
这种方法意味着由您掌控协调循环,因此可以插入任何需要的自定义逻辑、缓存或人工干预检查。
模式选择指南#
模式 | 代码量 | 灵活性 | 内置流式/事件 |
---|---|---|---|
AgentWorkflow | ⭐ – 最少 | ★★ | 支持 |
协调器智能体 | ⭐⭐ | ★★★ | 支持(通过协调器) |
自定义规划器 | ⭐⭐⭐ | ★★★★★ | 支持(通过子智能体)。顶层由您决定。 |
快速原型开发时,建议从 AgentWorkflow
开始。当需要更多序列控制时转向协调器智能体。仅当前两种模式无法满足流程需求时,才选择自定义规划器。