Henry
发布于 2026-01-06 / 11 阅读
0
0

Agentic AI - 第三课 - 多Agent协同模块化架构实战

背景简介

在人工智能快速发展的今天,单一Agent已经难以满足复杂应用场景的需求。多Agent协同系统通过将不同职责分配给专门的Agent,实现了更高效、更可靠的智能服务。本文将详细介绍一个完整的多Agent协同架构,包括路由Agent、执行Agent、验证Agent和响应Agent的模块化设计与实现。

前置信息

详细信息

架构设计

我们设计了一个四Agent协同的模块化架构,每个Agent承担特定的职责:

这种设计遵循了单一职责原则,每个Agent专注于自己的核心功能,通过清晰的接口进行协作。

系统依赖

# 核心依赖
ollama>=0.1.0
asyncio
typing

# 可选依赖
fastapi>=0.68.0
uvicorn>=0.15.0
pydantic>=1.8.0

# 开发依赖
pytest>=6.2.0
pytest-asyncio>=0.15.0
black>=21.0.0
flake8>=3.9.0

Router Agent: 智能路由分发

Router Agent是整个系统的入口,负责分析用户意图并将请求分类到合适的处理模式,通过精心设计的提示词,能够准确识别用户意图,为后续处理提供正确的上下文。

  • Router 代码
# app/ai_agent/agent_router.py

from typing import Callable
from app.core.pers_ollama import OllamaClient


class RouterAgent:
    """路由Agent:负责分析用户意图,分类为不同模式"""
    
    ROUTER_PROMPT = (
        "意图分类(只输出模式名):\n"
        "- CHAT: 闲聊、常识\n"
        "- WEATHER: 查天气\n"
        "- CALCULATOR: 算数\n\n"
        "示例:\n"
        "用户: 月亮为什么圆 -> CHAT\n"
        "用户: 上海冷不冷 -> WEATHER\n"
        "用户: 123*456 -> CALCULATOR\n\n"
        "用户问题: {user_message}\n模式:"
    )

    def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3.1:8b"):
        self.client = OllamaClient(base_url=base_url, model=model, temperature=0.2)
        self.model = model

    async def classify(self, user_message: str) -> str:
        """
        分析用户意图并返回对应的模式
        
        Args:
            user_message: 用户输入的消息
            
        Returns:
            str: 模式名称 (CHAT/WEATHER/CALCULATOR)
        """
        response = await self.client.chat(
            messages=[{"role": "user", "content": self.ROUTER_PROMPT.format(user_message=user_message)}], 
            tools=[]
        )
        content = response.content.strip().upper()
        
        if "CALC" in content:
            return "CALCULATOR"
        if "WEATHER" in content:
            return "WEATHER"
        return "CHAT"

    async def close(self):
        """关闭客户端连接"""
        await self.client.close()

Executor Agent:专业工具执行

Executor Agent负责根据Router的分类结果,提取参数并调用相应的工具:

  • Executor 代码
# app/ai_agent/agent_executor.py

from typing import List, Dict, Any
from app.core.pers_ollama import OllamaClient


class ExecutorAgent:
    """执行Agent:负责提取参数并调用指定工具"""
    
    EXECUTOR_SYSTEM_PROMPT = (
        "你是工具调用专家。你的任务是提取参数并调用指定工具。\n"
        "当前模式:{tool_type}\n\n"
        "重要规则:\n"
        "1. 必须使用系统提供的工具。\n"
        "2. 【中文城市名处理】:如果用户问的是中文城市名(如兰州、上海),"
        "必须**逐字复制**用户问题中的城市名作为参数,严禁翻译、拼音化或猜测。\n\n"
        "【示例】\n"
        "用户: 兰州的天气热吗? -> 调用 tool_get_weather(city='兰州')\n"
        "用户: 上海冷不冷? -> 调用 tool_get_weather(city='上海')\n"
        "用户: 123 * 456 是多少? -> 调用 tool_calculator(expression='123 * 456')\n\n"
        "3. 如果收到错误反馈,请根据反馈修正参数。"
    )

    def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3.1:8b"):
        self.client = OllamaClient(base_url=base_url, model=model, temperature=0.2)
        self.model = model

    async def execute(
        self, 
        user_message: str, 
        tool_type: str, 
        history_msgs: List[Dict[str, str]] = None
    ):
        """
        提取参数并执行工具调用
        
        Args:
            user_message: 用户输入的消息
            tool_type: 工具类型 (WEATHER/CALCULATOR)
            history_msgs: 历史对话消息(用于修正参数)
            
        Returns:
            ChatResponse: 包含工具调用信息的响应对象
        """
        if history_msgs is None:
            history_msgs = []
        
        system_content = self.EXECUTOR_SYSTEM_PROMPT.format(tool_type=tool_type)
        
        if history_msgs:
            messages = [{"role": "system", "content": system_content}] + history_msgs
        else:
            messages = [
                {"role": "system", "content": system_content}, 
                {"role": "user", "content": user_message}
            ]
        
        # 根据工具类型筛选可用的工具
        from app.ai_agent.agent_tools import TOOLS
        
        allowed_tools = []
        if tool_type == "WEATHER":
            allowed_tools = [t for t in TOOLS if t["function"]["name"] == "tool_get_weather"]
        elif tool_type == "CALCULATOR":
            allowed_tools = [t for t in TOOLS if t["function"]["name"] == "tool_calculator"]
            
        return await self.client.chat(messages=messages, tools=allowed_tools)

    async def close(self):
        """关闭客户端连接"""
        await self.client.close()

Validator Agent:质量保证专家

Validator Agent作为质量把关者,确保工具调用的正确性:

  • Validator 代码
# app/agent_validator.py

from typing import Tuple
from app.core.pers_ollama import OllamaClient


class ValidatorAgent:
    """验证Agent:负责审核工具调用的参数和结果是否正确"""
    
    VALIDATOR_PROMPT = (
        "你是参数审核员。请判断工具调用是否成功且符合用户意图。\n"
        "你需要查看:【用户问题】、【工具参数】、【工具执行结果】。\n\n"
        "输出要求:\n"
        "1. 成功:只输出 'PASS'。\n"
        "2. 失败:输出 'FAIL: 原因'。\n\n"
        "【示例】\n"
        "示例 1 (正确提取):\n"
        "用户: 兰州的天气\n"
        "参数: {{'city': '兰州'}}\n"
        "结果: 晴,25度\n"
        "结果: PASS\n\n"
        "示例 2 (提取错误导致未找到城市):\n"
        "用户: 兰州的天气\n"
        "参数: {{'city': '勗安'}}\n"
        "结果: 未找到城市: 勗安\n"
        "结果: FAIL: 工具未找到城市 '勗安'。用户问的是 '兰州',请直接使用 '兰州' 作为参数。\n\n"
        "示例 3 (中文乱码/错字):\n"
        "用户: 上海天气\n"
        "参数: {{'city': '上晦'}}\n"
        "结果: 无法获取天气信息\n"
        "结果: FAIL: 参数城市名 '上晦' 错误,请修正为 '上海'。\n\n"
        "示例 4 (错误的表达式):\n"
        "用户: 计算一个半径为3的圆的面积\n"
        "参数: {{'expression': 'π*3^2'}}\n"
        "结果: 表达式无效或包含非法操作:\n"
        "结果: FAIL: 表达式非法,转化为纯数字的公式,不使用特殊字符,例如'3.14159 * 3 * 3'。\n\n"
        "【当前任务】\n"
        "用户: {user_message}\n"
        "参数: {tool_args}\n"
        "工具结果: {tool_result}\n"
        "结果:"
    )

    def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3.1:8b"):
        self.client = OllamaClient(base_url=base_url, model=model, temperature=0.2)
        self.model = model

    async def validate(
        self, 
        user_message: str, 
        tool_name: str, 
        tool_args: dict, 
        tool_type: str, 
        tool_result_str: str
    ) -> Tuple[bool, str]:
        """
        验证工具调用的参数和结果是否正确
        
        Args:
            user_message: 用户输入的消息
            tool_name: 工具名称
            tool_args: 工具调用参数
            tool_type: 工具类型
            tool_result_str: 工具执行结果
            
        Returns:
            Tuple[bool, str]: (是否通过验证, 验证反馈信息)
        """
        prompt = self.VALIDATOR_PROMPT.format(
            user_message=user_message,
            tool_args=tool_args,
            tool_result=tool_result_str
        )
        response = await self.client.chat(
            messages=[{"role": "user", "content": prompt}], 
            tools=[]
        )
        content = response.content.strip()
        
        if "PASS" in content.upper():
            return True, "PASS"
        else:
            return False, content

    async def close(self):
        """关闭客户端连接"""
        await self.client.close()

Responder Agent:友好响应生成

Responder Agent负责将工具执行结果转化为自然、友好的用户回复:

  • Responder 代码
# app/ai_agent/agent_responder.py

from app.core.pers_ollama import OllamaClient


class ResponderAgent:
    """响应Agent:负责根据工具结果生成友好的自然语言回答"""
    
    RESPONDER_PROMPT = (
        "你是一个友好的助手。请根据工具结果回答用户的问题。\n"
        "用户问题:{user_message}\n"
        "工具返回结果:{tool_result}\n\n"
        "请用自然、口语化的中文回答。"
    )

    def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3.1:8b"):
        self.client = OllamaClient(base_url=base_url, model=model, temperature=0.2)
        self.model = model

    async def generate_response_stream(self, user_message: str, tool_result: str):
        """
        生成流式回答
        
        Args:
            user_message: 用户输入的消息
            tool_result: 工具执行结果
            
        Returns:
            流式迭代器,逐步生成回答内容
        """
        prompt = self.RESPONDER_PROMPT.format(
            user_message=user_message,
            tool_result=tool_result
        )
        # 获取流式迭代器
        stream = await self.client.chat(
            messages=[{"role": "user", "content": prompt}], 
            tools=[],
            stream=True
        )
        return stream

    async def close(self):
        """关闭客户端连接"""
        await self.client.close()

多 Agent 协同模块

  • Multi-Agent 组装代码
import sys
import time
from typing import Dict, Callable
from app.ai_agent.agent_tools import tool_calculator, tool_get_weather
from app.ai_agent.agent_tools import TOOLS
from app.ai_agent.agent_router import RouterAgent
from app.ai_agent.agent_executor import ExecutorAgent
from app.ai_agent.agent_validator import ValidatorAgent
from app.ai_agent.agent_responder import ResponderAgent


# ========== 工具注册表 ==========
TOOL_REGISTRY: Dict[str, Callable] = {
    "tool_get_weather": tool_get_weather,
    "tool_calculator": tool_calculator,
}


class MultiAgentOrchestrator:
    """多Agent协同编排器:协调Router、Executor、Validator、Responder四个Agent的工作流"""
    
    def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3.1:8b"):
        self.router = RouterAgent(base_url=base_url, model=model)
        self.executor = ExecutorAgent(base_url=base_url, model=model)
        self.validator = ValidatorAgent(base_url=base_url, model=model)
        self.responder = ResponderAgent(base_url=base_url, model=model)
        self.model = model

    async def close(self):
        """关闭所有Agent的客户端连接"""
        await self.router.close()
        await self.executor.close()
        await self.validator.close()
        await self.responder.close()

    async def chat(self, user_message: str) -> str:
        """
        主对话流程:Router -> Executor -> Validator -> Responder
        
        Args:
            user_message: 用户输入的消息
            
        Returns:
            str: 最终的完整回答
        """
        print(f"\n👤 用户: {user_message}")
        print("-" * 60)

        # 1. Router - 意图分类
        print("🧠 [Router] 分析意图...")
        mode = await self.router.classify(user_message)
        print(f"🧠 [Router] 模式: {mode}")

        if mode == "CHAT":
            # 流式输出闲聊回答
            print("💬 [Responder] ", end="", flush=True)
            stream = await self.responder.generate_response_stream(
                user_message, 
                "这是一个常识问题,不需要工具。"
            )
            full_response = ""
            async for chunk in stream:
                print(chunk, end="", flush=True)
                full_response += chunk
            print()
            return full_response

        # 2. Executor -> Validator 循环(最多尝试5次)
        conversation_history = [] 
        
        for attempt in range(5):
            print(f"🔧 [Executor] 尝试 {attempt + 1}...", end=" ")
            
            exec_response = await self.executor.execute(user_message, mode, conversation_history)
            
            if not exec_response.tool_calls:
                print(f"❌ 没有生成工具调用")
                if attempt == 0:
                    conversation_history.append({
                        "role": "user", 
                        "content": "你没有调用工具!请立即调用工具,不要只用文字描述。"
                    })
                    continue
                else:
                    return "Executor 执行失败,模型拒绝调用工具。"

            tool_call = exec_response.tool_calls[0]
            tool_name = tool_call["function"]["name"]
            tool_args = tool_call["function"]["arguments"]
            
            print(f"→ 调用 {tool_name}({tool_args})")
            
            # 真实执行工具
            try:
                tool_result = TOOL_REGISTRY[tool_name](**tool_args)
                tool_result_str = str(tool_result)
                print(f"✅ [Result] {tool_result_str[:80]}{'...' if len(tool_result_str) > 80 else ''}")
            except Exception as e:
                tool_result = f"工具执行报错: {str(e)}"
                tool_result_str = tool_result
                print(f"❌ [Result] {tool_result_str[:80]}")

            # 3. Validator - 验证工具调用结果
            print("🔍 [Validator] ", end="", flush=True)
            
            is_valid, feedback = await self.validator.validate(
                user_message, 
                tool_name, 
                tool_args, 
                mode, 
                tool_result_str
            )
            
            # 在 MultiAgentOrchestrator.chat() 中修改 Responder 部分
            if is_valid:
                print("✅ 通过\n")
                # 4. Responder - 流式输出最终回答
                print("💬 [Responder] ", end="", flush=True)
                stream = await self.responder.generate_response_stream(user_message, str(tool_result))
                
                full_response = ""
                chunk_count = 0
                async for chunk in stream:
                    chunk_count += 1
                    # 调试输出
                    # sys.stdout.write(f"[{chunk_count}:{len(chunk)}]")
                    sys.stdout.flush()
                    # time.sleep(0.05)  # 测试用:人工延迟,观察是否是速度问题
                    
                    sys.stdout.write(chunk)
                    sys.stdout.flush()
                    full_response += chunk
                # sys.stdout.write("\n")
                sys.stdout.flush()
                return full_response

            else:
                print(f"❌ 拒绝: {feedback}")
                conversation_history.append({
                    "role": "user",
                    "content": f"上一次调用失败。原因:{feedback}。请修正参数并再次调用工具。"
                })

        return "抱歉,尝试多次后仍未通过验证。"


# ================= 测试入口 =================

async def main():
    bot = MultiAgentOrchestrator()
    
    test_questions = [
        "上海今天天气如何?",
        "计算一个半径为3的圆的面积",
        "半径为5cm的圆的周长是多少?",
        "计算一个半径为12,高为2cm的圆锥的体积",
        "Give me a weather introduction for a random city of China."
    ]

    print("🚀 四 Agent 协同 (流式输出版)")
    print("=" * 60)

    try:
        for q in test_questions:
            answer = await bot.chat(q)
            print("\n","=" * 60,"\n",end="")
    finally:
        await bot.close()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

功能亮点:

  • 模块化设计:每个Agent职责单一,易于维护和扩展
  • 错误恢复机制:通过Executor-Validator循环,自动修正参数错误
  • 流式输出:提供实时响应,改善用户体验

实际运行效果

🚀 四 Agent 协同 (流式输出版)
============================================================

👤 用户: 上海今天天气如何?
------------------------------------------------------------
🧠 [Router] 分析意图...
🧠 [Router] 模式: WEATHER
🔧 [Executor] 尝试 1... → 调用 tool_get_weather({'city': '北京'})
✅ [Result] 城市: 北京
温度: 5.8°C
体感温度: 2.8°C
湿度: 67%
风速: 6.2 km/h
天气: 小雨
🔍 [Validator] ❌ 拒绝: FAIL: 工具返回的是北京的天气,而用户问的是上海的天气。工具参数中的 'city' 应该是 '上海'。
🔧 [Executor] 尝试 2... → 调用 tool_get_weather({'city': '北京'})
✅ [Result] 城市: 北京
温度: 5.8°C
体感温度: 2.8°C
湿度: 67%
风速: 6.2 km/h
天气: 小雨
🔍 [Validator] ❌ 拒绝: 根据提供的信息,我可以判断工具调用是否成功且符合用户意图。

【用户问题】:上海今天天气如何?
【工具参数】:{'city': '北京'}
【工具执行结果】:
城市: 北京
温度: 5.8°C
体感温度: 2.8°C
湿度: 67%
风速: 6.2 km/h
天气: 小雨

由于用户询问的是上海的天气,但工具参数中使用的是北京,因此我认为这是一个错误的调用。

输出:FAIL: 用户问题是 '上海今天天气如何?',但工具参数中使用的是 '北京'。
🔧 [Executor] 尝试 3... → 调用 tool_get_weather({'city': '北京'})
✅ [Result] 城市: 北京
温度: 5.8°C
体感温度: 2.8°C
湿度: 67%
风速: 6.2 km/h
天气: 小雨
🔍 [Validator] ❌ 拒绝: 根据用户的问题和工具的执行结果,我可以判断出这个调用是失败的。

原因:用户问的是上海今天的天气,但参数中却填写了北京。因此,工具返回的是北京的天气信息,而不是上海的。

输出:FAIL: 参数城市名 '北京' 错误,请修正为 '上海'。
🔧 [Executor] 尝试 4... → 调用 tool_get_weather({'city': '上海'})
✅ [Result] 城市: 上海
温度: 2.0°C
体感温度: -2.9°C
湿度: 54%
风速: 13.3 km/h
天气: 晴朗
🔍 [Validator] ✅ 通过

💬 [Responder] 上海今天的天气不错,阳光明媚,温度也还算舒适,目前是2摄氏度左右,感觉有点冷,湿度也不是很高,风速中等。
 ============================================================ 

👤 用户: 计算一个半径为3的圆的面积
------------------------------------------------------------
🧠 [Router] 分析意图...
🧠 [Router] 模式: CALCULATOR
🔧 [Executor] 尝试 1... → 调用 tool_calculator({'expression': '3.14 * 3^2'})
✅ [Result] 表达式无效或包含非法操作: 不支持的二元操作符: <class 'ast.BitXor'>
🔍 [Validator] ❌ 拒绝: FAIL: 表达式非法,转化为纯数字的公式,不使用特殊字符,例如'3.14159 * 3 * 3'。
🔧 [Executor] 尝试 2... → 调用 tool_calculator({'expression': '3*3'})
✅ [Result] 9
🔍 [Validator] ✅ 通过

💬 [Responder] 这个问题很简单!根据数学公式,圆的面积是πr^2(π是圆周率,约等于3.14),其中r是半径。所以,如果半径为3,那么圆的面积就是3.14*3*3=28.26。工具返回结果9可能是由于计算精度问题或工具本身存在错误。
 ============================================================ 

👤 用户: 半径为5cm的圆的周长是多少?
------------------------------------------------------------
🧠 [Router] 分析意图...
🧠 [Router] 模式: CALCULATOR
🔧 [Executor] 尝试 1... → 调用 tool_calculator({'expression': '2 * 3.14 * 5'})
✅ [Result] 31.400000000000002
🔍 [Validator] ✅ 通过

💬 [Responder] 这个圆的周长是 31.4 厘米。
 ============================================================ 

👤 用户: 计算一个半径为12,高为2cm的圆锥的体积
------------------------------------------------------------
🧠 [Router] 分析意图...
🧠 [Router] 模式: CALCULATOR
🔧 [Executor] 尝试 1... → 调用 tool_calculator({'expression': 'π * 12^2 * 2 / 3'})
✅ [Result] 表达式无效或包含非法操作: 非法的语法结构: Name
🔍 [Validator] ❌ 拒绝: 根据用户问题和工具执行结果,判断工具调用是否成功且符合用户意图。

用户问题:计算一个半径为12,高为2cm的圆锥的体积
参数:{'expression': 'π * 12^2 * 2 / 3'}
工具结果:表达式无效或包含非法操作: 非法的语法结构: Name

由于用户问题和参数都正确,但工具执行结果显示有错误,因此需要分析原因。

原因分析:

1. 用户问题清晰,参数也正确。
2. 工具执行结果显示“非法的语法结构:Name”,这意味着工具可能无法识别或解析表达式中的变量名“Name”。

解决方案:

1. 检查用户问题和参数是否有错误或遗漏。
2. 确认工具支持的数学运算符和函数是否正确使用。
3. 尝试修正表达式,例如将“Name”替换为具体数值。

最终结果:FAIL: 表达式非法,需要修正为 'π * 12^2 * 2 / 3'
🔧 [Executor] 尝试 2... → 调用 tool_calculator({'expression': 'π * 12² * 2 / 3'})
✅ [Result] 表达式无效或包含非法操作: invalid character '²' (U+00B2) (<unknown>, line 1)
🔍 [Validator] ❌ 拒绝: 根据示例要求,我需要检查用户问题、工具参数和工具执行结果来判断工具调用是否成功且符合用户意图。

用户问题:计算一个半径为12,高为2cm的圆锥的体积
工具参数:{'expression': 'π * 12² * 2 / 3'}
工具执行结果:表达式无效或包含非法操作: invalid character '²' (U+00B2) (<unknown>, line 1)

根据工具执行结果,表达式中出现了特殊字符‘²’(U+00B2),这是一个非法操作。因此,我需要输出:

FAIL: 表达式非法,请修正为纯数字的公式,不使用特殊字符,例如'3.14159 * 12 * 12 * 2 / 3'。
🔧 [Executor] 尝试 3... → 调用 tool_calculator({'expression': '3.14159 * 12 * 12 * 2 / 3'})
✅ [Result] 301.59263999999996
🔍 [Validator] ✅ 通过

💬 [Responder] 这个圆锥的体积是301.59立方厘米。
 ============================================================ 

👤 用户: Give me a weather introduction for a random city of China.
------------------------------------------------------------
🧠 [Router] 分析意图...
🧠 [Router] 模式: WEATHER
🔧 [Executor] 尝试 1... → 调用 tool_get_weather({'city': '北京'})
✅ [Result] 城市: 北京
温度: 5.8°C
体感温度: 2.8°C
湿度: 67%
风速: 6.2 km/h
天气: 小雨
🔍 [Validator] ✅ 通过

💬 [Responder] 北京今天的天气不太好,外面小雨不断,温度也比较低,仅有5.8摄氏度。体感温度更是只有2.8摄氏度,湿度也较高达67%。风速也有一点儿,达到6.2公里每小时。

以上便是本文的全部内容,感谢您的阅读,如遇到任何问题,欢迎在评论区留言讨论。



评论