背景简介
在人工智能快速发展的今天,单一Agent已经难以满足复杂应用场景的需求。多Agent协同系统通过将不同职责分配给专门的Agent,实现了更高效、更可靠的智能服务。本文将详细介绍一个完整的多Agent协同架构,包括路由Agent、执行Agent、验证Agent和响应Agent的模块化设计与实现。
前置信息
详细信息
架构设计
我们设计了一个四Agent协同的模块化架构,每个Agent承担特定的职责:
这种设计遵循了单一职责原则,每个Agent专注于自己的核心功能,通过清晰的接口进行协作。
系统依赖
# 核心依赖
ollama>=0.1.0
asyncio
typing
# 可选依赖
fastapi>=0.68.0
uvicorn>=0.15.0
pydantic>=1.8.0
# 开发依赖
pytest>=6.2.0
pytest-asyncio>=0.15.0
black>=21.0.0
flake8>=3.9.0
Router Agent: 智能路由分发
Router Agent是整个系统的入口,负责分析用户意图并将请求分类到合适的处理模式,通过精心设计的提示词,能够准确识别用户意图,为后续处理提供正确的上下文。
- Router 代码
# app/ai_agent/agent_router.py
from typing import Callable
from app.core.pers_ollama import OllamaClient
class RouterAgent:
"""路由Agent:负责分析用户意图,分类为不同模式"""
ROUTER_PROMPT = (
"意图分类(只输出模式名):\n"
"- CHAT: 闲聊、常识\n"
"- WEATHER: 查天气\n"
"- CALCULATOR: 算数\n\n"
"示例:\n"
"用户: 月亮为什么圆 -> CHAT\n"
"用户: 上海冷不冷 -> WEATHER\n"
"用户: 123*456 -> CALCULATOR\n\n"
"用户问题: {user_message}\n模式:"
)
def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3.1:8b"):
self.client = OllamaClient(base_url=base_url, model=model, temperature=0.2)
self.model = model
async def classify(self, user_message: str) -> str:
"""
分析用户意图并返回对应的模式
Args:
user_message: 用户输入的消息
Returns:
str: 模式名称 (CHAT/WEATHER/CALCULATOR)
"""
response = await self.client.chat(
messages=[{"role": "user", "content": self.ROUTER_PROMPT.format(user_message=user_message)}],
tools=[]
)
content = response.content.strip().upper()
if "CALC" in content:
return "CALCULATOR"
if "WEATHER" in content:
return "WEATHER"
return "CHAT"
async def close(self):
"""关闭客户端连接"""
await self.client.close()
Executor Agent:专业工具执行
Executor Agent负责根据Router的分类结果,提取参数并调用相应的工具:
- Executor 代码
# app/ai_agent/agent_executor.py
from typing import List, Dict, Any
from app.core.pers_ollama import OllamaClient
class ExecutorAgent:
"""执行Agent:负责提取参数并调用指定工具"""
EXECUTOR_SYSTEM_PROMPT = (
"你是工具调用专家。你的任务是提取参数并调用指定工具。\n"
"当前模式:{tool_type}\n\n"
"重要规则:\n"
"1. 必须使用系统提供的工具。\n"
"2. 【中文城市名处理】:如果用户问的是中文城市名(如兰州、上海),"
"必须**逐字复制**用户问题中的城市名作为参数,严禁翻译、拼音化或猜测。\n\n"
"【示例】\n"
"用户: 兰州的天气热吗? -> 调用 tool_get_weather(city='兰州')\n"
"用户: 上海冷不冷? -> 调用 tool_get_weather(city='上海')\n"
"用户: 123 * 456 是多少? -> 调用 tool_calculator(expression='123 * 456')\n\n"
"3. 如果收到错误反馈,请根据反馈修正参数。"
)
def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3.1:8b"):
self.client = OllamaClient(base_url=base_url, model=model, temperature=0.2)
self.model = model
async def execute(
self,
user_message: str,
tool_type: str,
history_msgs: List[Dict[str, str]] = None
):
"""
提取参数并执行工具调用
Args:
user_message: 用户输入的消息
tool_type: 工具类型 (WEATHER/CALCULATOR)
history_msgs: 历史对话消息(用于修正参数)
Returns:
ChatResponse: 包含工具调用信息的响应对象
"""
if history_msgs is None:
history_msgs = []
system_content = self.EXECUTOR_SYSTEM_PROMPT.format(tool_type=tool_type)
if history_msgs:
messages = [{"role": "system", "content": system_content}] + history_msgs
else:
messages = [
{"role": "system", "content": system_content},
{"role": "user", "content": user_message}
]
# 根据工具类型筛选可用的工具
from app.ai_agent.agent_tools import TOOLS
allowed_tools = []
if tool_type == "WEATHER":
allowed_tools = [t for t in TOOLS if t["function"]["name"] == "tool_get_weather"]
elif tool_type == "CALCULATOR":
allowed_tools = [t for t in TOOLS if t["function"]["name"] == "tool_calculator"]
return await self.client.chat(messages=messages, tools=allowed_tools)
async def close(self):
"""关闭客户端连接"""
await self.client.close()
Validator Agent:质量保证专家
Validator Agent作为质量把关者,确保工具调用的正确性:
- Validator 代码
# app/agent_validator.py
from typing import Tuple
from app.core.pers_ollama import OllamaClient
class ValidatorAgent:
"""验证Agent:负责审核工具调用的参数和结果是否正确"""
VALIDATOR_PROMPT = (
"你是参数审核员。请判断工具调用是否成功且符合用户意图。\n"
"你需要查看:【用户问题】、【工具参数】、【工具执行结果】。\n\n"
"输出要求:\n"
"1. 成功:只输出 'PASS'。\n"
"2. 失败:输出 'FAIL: 原因'。\n\n"
"【示例】\n"
"示例 1 (正确提取):\n"
"用户: 兰州的天气\n"
"参数: {{'city': '兰州'}}\n"
"结果: 晴,25度\n"
"结果: PASS\n\n"
"示例 2 (提取错误导致未找到城市):\n"
"用户: 兰州的天气\n"
"参数: {{'city': '勗安'}}\n"
"结果: 未找到城市: 勗安\n"
"结果: FAIL: 工具未找到城市 '勗安'。用户问的是 '兰州',请直接使用 '兰州' 作为参数。\n\n"
"示例 3 (中文乱码/错字):\n"
"用户: 上海天气\n"
"参数: {{'city': '上晦'}}\n"
"结果: 无法获取天气信息\n"
"结果: FAIL: 参数城市名 '上晦' 错误,请修正为 '上海'。\n\n"
"示例 4 (错误的表达式):\n"
"用户: 计算一个半径为3的圆的面积\n"
"参数: {{'expression': 'π*3^2'}}\n"
"结果: 表达式无效或包含非法操作:\n"
"结果: FAIL: 表达式非法,转化为纯数字的公式,不使用特殊字符,例如'3.14159 * 3 * 3'。\n\n"
"【当前任务】\n"
"用户: {user_message}\n"
"参数: {tool_args}\n"
"工具结果: {tool_result}\n"
"结果:"
)
def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3.1:8b"):
self.client = OllamaClient(base_url=base_url, model=model, temperature=0.2)
self.model = model
async def validate(
self,
user_message: str,
tool_name: str,
tool_args: dict,
tool_type: str,
tool_result_str: str
) -> Tuple[bool, str]:
"""
验证工具调用的参数和结果是否正确
Args:
user_message: 用户输入的消息
tool_name: 工具名称
tool_args: 工具调用参数
tool_type: 工具类型
tool_result_str: 工具执行结果
Returns:
Tuple[bool, str]: (是否通过验证, 验证反馈信息)
"""
prompt = self.VALIDATOR_PROMPT.format(
user_message=user_message,
tool_args=tool_args,
tool_result=tool_result_str
)
response = await self.client.chat(
messages=[{"role": "user", "content": prompt}],
tools=[]
)
content = response.content.strip()
if "PASS" in content.upper():
return True, "PASS"
else:
return False, content
async def close(self):
"""关闭客户端连接"""
await self.client.close()
Responder Agent:友好响应生成
Responder Agent负责将工具执行结果转化为自然、友好的用户回复:
- Responder 代码
# app/ai_agent/agent_responder.py
from app.core.pers_ollama import OllamaClient
class ResponderAgent:
"""响应Agent:负责根据工具结果生成友好的自然语言回答"""
RESPONDER_PROMPT = (
"你是一个友好的助手。请根据工具结果回答用户的问题。\n"
"用户问题:{user_message}\n"
"工具返回结果:{tool_result}\n\n"
"请用自然、口语化的中文回答。"
)
def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3.1:8b"):
self.client = OllamaClient(base_url=base_url, model=model, temperature=0.2)
self.model = model
async def generate_response_stream(self, user_message: str, tool_result: str):
"""
生成流式回答
Args:
user_message: 用户输入的消息
tool_result: 工具执行结果
Returns:
流式迭代器,逐步生成回答内容
"""
prompt = self.RESPONDER_PROMPT.format(
user_message=user_message,
tool_result=tool_result
)
# 获取流式迭代器
stream = await self.client.chat(
messages=[{"role": "user", "content": prompt}],
tools=[],
stream=True
)
return stream
async def close(self):
"""关闭客户端连接"""
await self.client.close()
多 Agent 协同模块
- Multi-Agent 组装代码
import sys
import time
from typing import Dict, Callable
from app.ai_agent.agent_tools import tool_calculator, tool_get_weather
from app.ai_agent.agent_tools import TOOLS
from app.ai_agent.agent_router import RouterAgent
from app.ai_agent.agent_executor import ExecutorAgent
from app.ai_agent.agent_validator import ValidatorAgent
from app.ai_agent.agent_responder import ResponderAgent
# ========== 工具注册表 ==========
TOOL_REGISTRY: Dict[str, Callable] = {
"tool_get_weather": tool_get_weather,
"tool_calculator": tool_calculator,
}
class MultiAgentOrchestrator:
"""多Agent协同编排器:协调Router、Executor、Validator、Responder四个Agent的工作流"""
def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3.1:8b"):
self.router = RouterAgent(base_url=base_url, model=model)
self.executor = ExecutorAgent(base_url=base_url, model=model)
self.validator = ValidatorAgent(base_url=base_url, model=model)
self.responder = ResponderAgent(base_url=base_url, model=model)
self.model = model
async def close(self):
"""关闭所有Agent的客户端连接"""
await self.router.close()
await self.executor.close()
await self.validator.close()
await self.responder.close()
async def chat(self, user_message: str) -> str:
"""
主对话流程:Router -> Executor -> Validator -> Responder
Args:
user_message: 用户输入的消息
Returns:
str: 最终的完整回答
"""
print(f"\n👤 用户: {user_message}")
print("-" * 60)
# 1. Router - 意图分类
print("🧠 [Router] 分析意图...")
mode = await self.router.classify(user_message)
print(f"🧠 [Router] 模式: {mode}")
if mode == "CHAT":
# 流式输出闲聊回答
print("💬 [Responder] ", end="", flush=True)
stream = await self.responder.generate_response_stream(
user_message,
"这是一个常识问题,不需要工具。"
)
full_response = ""
async for chunk in stream:
print(chunk, end="", flush=True)
full_response += chunk
print()
return full_response
# 2. Executor -> Validator 循环(最多尝试5次)
conversation_history = []
for attempt in range(5):
print(f"🔧 [Executor] 尝试 {attempt + 1}...", end=" ")
exec_response = await self.executor.execute(user_message, mode, conversation_history)
if not exec_response.tool_calls:
print(f"❌ 没有生成工具调用")
if attempt == 0:
conversation_history.append({
"role": "user",
"content": "你没有调用工具!请立即调用工具,不要只用文字描述。"
})
continue
else:
return "Executor 执行失败,模型拒绝调用工具。"
tool_call = exec_response.tool_calls[0]
tool_name = tool_call["function"]["name"]
tool_args = tool_call["function"]["arguments"]
print(f"→ 调用 {tool_name}({tool_args})")
# 真实执行工具
try:
tool_result = TOOL_REGISTRY[tool_name](**tool_args)
tool_result_str = str(tool_result)
print(f"✅ [Result] {tool_result_str[:80]}{'...' if len(tool_result_str) > 80 else ''}")
except Exception as e:
tool_result = f"工具执行报错: {str(e)}"
tool_result_str = tool_result
print(f"❌ [Result] {tool_result_str[:80]}")
# 3. Validator - 验证工具调用结果
print("🔍 [Validator] ", end="", flush=True)
is_valid, feedback = await self.validator.validate(
user_message,
tool_name,
tool_args,
mode,
tool_result_str
)
# 在 MultiAgentOrchestrator.chat() 中修改 Responder 部分
if is_valid:
print("✅ 通过\n")
# 4. Responder - 流式输出最终回答
print("💬 [Responder] ", end="", flush=True)
stream = await self.responder.generate_response_stream(user_message, str(tool_result))
full_response = ""
chunk_count = 0
async for chunk in stream:
chunk_count += 1
# 调试输出
# sys.stdout.write(f"[{chunk_count}:{len(chunk)}]")
sys.stdout.flush()
# time.sleep(0.05) # 测试用:人工延迟,观察是否是速度问题
sys.stdout.write(chunk)
sys.stdout.flush()
full_response += chunk
# sys.stdout.write("\n")
sys.stdout.flush()
return full_response
else:
print(f"❌ 拒绝: {feedback}")
conversation_history.append({
"role": "user",
"content": f"上一次调用失败。原因:{feedback}。请修正参数并再次调用工具。"
})
return "抱歉,尝试多次后仍未通过验证。"
# ================= 测试入口 =================
async def main():
bot = MultiAgentOrchestrator()
test_questions = [
"上海今天天气如何?",
"计算一个半径为3的圆的面积",
"半径为5cm的圆的周长是多少?",
"计算一个半径为12,高为2cm的圆锥的体积",
"Give me a weather introduction for a random city of China."
]
print("🚀 四 Agent 协同 (流式输出版)")
print("=" * 60)
try:
for q in test_questions:
answer = await bot.chat(q)
print("\n","=" * 60,"\n",end="")
finally:
await bot.close()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
功能亮点:
- 模块化设计:每个Agent职责单一,易于维护和扩展
- 错误恢复机制:通过Executor-Validator循环,自动修正参数错误
- 流式输出:提供实时响应,改善用户体验
实际运行效果
🚀 四 Agent 协同 (流式输出版)
============================================================
👤 用户: 上海今天天气如何?
------------------------------------------------------------
🧠 [Router] 分析意图...
🧠 [Router] 模式: WEATHER
🔧 [Executor] 尝试 1... → 调用 tool_get_weather({'city': '北京'})
✅ [Result] 城市: 北京
温度: 5.8°C
体感温度: 2.8°C
湿度: 67%
风速: 6.2 km/h
天气: 小雨
🔍 [Validator] ❌ 拒绝: FAIL: 工具返回的是北京的天气,而用户问的是上海的天气。工具参数中的 'city' 应该是 '上海'。
🔧 [Executor] 尝试 2... → 调用 tool_get_weather({'city': '北京'})
✅ [Result] 城市: 北京
温度: 5.8°C
体感温度: 2.8°C
湿度: 67%
风速: 6.2 km/h
天气: 小雨
🔍 [Validator] ❌ 拒绝: 根据提供的信息,我可以判断工具调用是否成功且符合用户意图。
【用户问题】:上海今天天气如何?
【工具参数】:{'city': '北京'}
【工具执行结果】:
城市: 北京
温度: 5.8°C
体感温度: 2.8°C
湿度: 67%
风速: 6.2 km/h
天气: 小雨
由于用户询问的是上海的天气,但工具参数中使用的是北京,因此我认为这是一个错误的调用。
输出:FAIL: 用户问题是 '上海今天天气如何?',但工具参数中使用的是 '北京'。
🔧 [Executor] 尝试 3... → 调用 tool_get_weather({'city': '北京'})
✅ [Result] 城市: 北京
温度: 5.8°C
体感温度: 2.8°C
湿度: 67%
风速: 6.2 km/h
天气: 小雨
🔍 [Validator] ❌ 拒绝: 根据用户的问题和工具的执行结果,我可以判断出这个调用是失败的。
原因:用户问的是上海今天的天气,但参数中却填写了北京。因此,工具返回的是北京的天气信息,而不是上海的。
输出:FAIL: 参数城市名 '北京' 错误,请修正为 '上海'。
🔧 [Executor] 尝试 4... → 调用 tool_get_weather({'city': '上海'})
✅ [Result] 城市: 上海
温度: 2.0°C
体感温度: -2.9°C
湿度: 54%
风速: 13.3 km/h
天气: 晴朗
🔍 [Validator] ✅ 通过
💬 [Responder] 上海今天的天气不错,阳光明媚,温度也还算舒适,目前是2摄氏度左右,感觉有点冷,湿度也不是很高,风速中等。
============================================================
👤 用户: 计算一个半径为3的圆的面积
------------------------------------------------------------
🧠 [Router] 分析意图...
🧠 [Router] 模式: CALCULATOR
🔧 [Executor] 尝试 1... → 调用 tool_calculator({'expression': '3.14 * 3^2'})
✅ [Result] 表达式无效或包含非法操作: 不支持的二元操作符: <class 'ast.BitXor'>
🔍 [Validator] ❌ 拒绝: FAIL: 表达式非法,转化为纯数字的公式,不使用特殊字符,例如'3.14159 * 3 * 3'。
🔧 [Executor] 尝试 2... → 调用 tool_calculator({'expression': '3*3'})
✅ [Result] 9
🔍 [Validator] ✅ 通过
💬 [Responder] 这个问题很简单!根据数学公式,圆的面积是πr^2(π是圆周率,约等于3.14),其中r是半径。所以,如果半径为3,那么圆的面积就是3.14*3*3=28.26。工具返回结果9可能是由于计算精度问题或工具本身存在错误。
============================================================
👤 用户: 半径为5cm的圆的周长是多少?
------------------------------------------------------------
🧠 [Router] 分析意图...
🧠 [Router] 模式: CALCULATOR
🔧 [Executor] 尝试 1... → 调用 tool_calculator({'expression': '2 * 3.14 * 5'})
✅ [Result] 31.400000000000002
🔍 [Validator] ✅ 通过
💬 [Responder] 这个圆的周长是 31.4 厘米。
============================================================
👤 用户: 计算一个半径为12,高为2cm的圆锥的体积
------------------------------------------------------------
🧠 [Router] 分析意图...
🧠 [Router] 模式: CALCULATOR
🔧 [Executor] 尝试 1... → 调用 tool_calculator({'expression': 'π * 12^2 * 2 / 3'})
✅ [Result] 表达式无效或包含非法操作: 非法的语法结构: Name
🔍 [Validator] ❌ 拒绝: 根据用户问题和工具执行结果,判断工具调用是否成功且符合用户意图。
用户问题:计算一个半径为12,高为2cm的圆锥的体积
参数:{'expression': 'π * 12^2 * 2 / 3'}
工具结果:表达式无效或包含非法操作: 非法的语法结构: Name
由于用户问题和参数都正确,但工具执行结果显示有错误,因此需要分析原因。
原因分析:
1. 用户问题清晰,参数也正确。
2. 工具执行结果显示“非法的语法结构:Name”,这意味着工具可能无法识别或解析表达式中的变量名“Name”。
解决方案:
1. 检查用户问题和参数是否有错误或遗漏。
2. 确认工具支持的数学运算符和函数是否正确使用。
3. 尝试修正表达式,例如将“Name”替换为具体数值。
最终结果:FAIL: 表达式非法,需要修正为 'π * 12^2 * 2 / 3'
🔧 [Executor] 尝试 2... → 调用 tool_calculator({'expression': 'π * 12² * 2 / 3'})
✅ [Result] 表达式无效或包含非法操作: invalid character '²' (U+00B2) (<unknown>, line 1)
🔍 [Validator] ❌ 拒绝: 根据示例要求,我需要检查用户问题、工具参数和工具执行结果来判断工具调用是否成功且符合用户意图。
用户问题:计算一个半径为12,高为2cm的圆锥的体积
工具参数:{'expression': 'π * 12² * 2 / 3'}
工具执行结果:表达式无效或包含非法操作: invalid character '²' (U+00B2) (<unknown>, line 1)
根据工具执行结果,表达式中出现了特殊字符‘²’(U+00B2),这是一个非法操作。因此,我需要输出:
FAIL: 表达式非法,请修正为纯数字的公式,不使用特殊字符,例如'3.14159 * 12 * 12 * 2 / 3'。
🔧 [Executor] 尝试 3... → 调用 tool_calculator({'expression': '3.14159 * 12 * 12 * 2 / 3'})
✅ [Result] 301.59263999999996
🔍 [Validator] ✅ 通过
💬 [Responder] 这个圆锥的体积是301.59立方厘米。
============================================================
👤 用户: Give me a weather introduction for a random city of China.
------------------------------------------------------------
🧠 [Router] 分析意图...
🧠 [Router] 模式: WEATHER
🔧 [Executor] 尝试 1... → 调用 tool_get_weather({'city': '北京'})
✅ [Result] 城市: 北京
温度: 5.8°C
体感温度: 2.8°C
湿度: 67%
风速: 6.2 km/h
天气: 小雨
🔍 [Validator] ✅ 通过
💬 [Responder] 北京今天的天气不太好,外面小雨不断,温度也比较低,仅有5.8摄氏度。体感温度更是只有2.8摄氏度,湿度也较高达67%。风速也有一点儿,达到6.2公里每小时。
以上便是本文的全部内容,感谢您的阅读,如遇到任何问题,欢迎在评论区留言讨论。