背景简介
随着人工智能技术的快速发展,Agentic AI(代理式AI)已成为实现复杂任务自动化的重要工具。基于ReAct范式的智能Agent系统通过推理与行动的循环,能够自主完成参数提取、工具调用、错误修正等复杂任务。本文将深入探讨如何优化Executor Agent,实现高效的参数提取和工具调用功能,为构建更加智能、可靠的AI Agent系统提供参考。
前置信息
- 完成 【Agentic AI - 第一课 - 从零实现基于 ReAct 范式的智能 Agent】
- 完成 【Agentic AI - 第二课 - 多Agent协同】
- 完成 【Agentic AI - 第三课 - 多Agent协同模块化】
- 完成 【Agentic AI - 第四课 - 优化 Router Agent 实现高效意图识别】
详细信息
核心架构设计
优化后的Executor Agent采用了协议化设计理念,通过定义LLMClientProtocol接口确保与不同大语言模型实现的兼容性。这种设计使得Executor Agent能够灵活适配各种LLM服务,包括本地部署的Ollama和云端API服务。
系统架构包含以下几个核心组件:
- LLMClientProtocol:定义了与LLM交互的标准接口
- ExecutorConfig:提供灵活的配置管理
- ExecutorMetrics:实现全面的性能监控
- ExecutorAgent:核心执行引擎,负责参数提取和工具调用
配置管理系统
ExecutorConfig类实现了高度可配置的设计,支持模型参数、超时设置和系统提示的灵活定制。配置系统包含了完善的参数验证机制,确保所有配置参数的有效性。特别是在处理中文城市名时,系统采用了严格的参数提取规则,确保城市名的准确传递。
配置项包括:
- 模型选择和温度参数控制
- 超时时间和重试次数设置
- 可定制的系统提示模板
- 参数有效性验证机制
性能监控与指标统计
ExecutorMetrics类提供了全面的性能监控功能,能够实时跟踪执行过程中的各项关键指标。监控指标包括执行次数、成功率、工具调用分布、平均延迟等,为系统优化提供了数据支撑。
性能监控特点:
- 实时统计执行成功率和失败率
- 记录超时次数和重试情况
- 分析工具调用分布模式
- 计算平均响应延迟
异步处理与错误修正
ExecutorAgent支持完整的异步处理机制,通过asyncio实现高效的并发处理。系统内置了重试机制,能够优雅处理超时和异常情况,确保系统的稳定性和可靠性。
异步处理特性:
- 支持异步上下文管理器
- 实现超时控制和重试逻辑
- 提供资源清理和连接管理
- 支持批量处理和并发执行
工具调用机制
系统采用了灵活的工具调用架构,支持WEATHER和CALCULATOR两种工具类型。通过工具映射机制,能够根据指定的工具类型动态选择合适的工具进行调用。工具调用过程包含了完整的参数提取、验证和执行流程。
工具调用特点:
- 动态工具选择和映射
- 智能参数提取和验证
- 支持历史消息处理
- 提供详细的执行日志
特点
- 高度模块化:各组件职责清晰,便于维护和扩展
- 协议化设计:支持多种LLM实现,提高系统灵活性
- 完善的监控:提供全面的性能指标和执行统计
- 异步处理:支持高并发和大规模应用场景
- 错误恢复:内置重试机制和异常处理,确保系统稳定性
示例代码
- 优化后 Executor Agent 代码
# app/ai_agent/agent_executor.py
"""执行Agent模块
提供基于大语言模型的工具执行功能,用于提取参数并调用指定工具。
主要功能包括参数提取、工具调用、错误修正、性能监控和异步处理。
"""
import logging
import asyncio
from typing import Protocol, List, Dict, Any, Optional, runtime_checkable
from dataclasses import dataclass, field
from collections import defaultdict
from time import perf_counter
logger = logging.getLogger(__name__ )
@runtime_checkable
class LLMClientProtocol(Protocol):
"""大语言模型客户端协议定义
定义了与大语言模型交互的标准接口,确保不同LLM实现的一致性
"""
async def chat(self, messages: List[Dict[str, str]], tools: List[Dict]) -> Any:
"""与LLM进行对话交互
Args:
messages: 对话消息列表,包含角色和内容
tools: 可用工具列表
Returns:
LLM的响应结果,格式取决于具体实现
"""
...
@dataclass
class ExecutorConfig:
"""执行Agent配置类
包含LLM参数、超时设置和系统提示等配置,用于控制执行Agent的行为
"""
model: str = "llama3.1:8b"
temperature: float = 0.2
timeout: float = 30.0
# 修改 ExecutorConfig 中的 executor_prompt_template
executor_prompt_template: str = (
"你是工具调用专家。你的任务是分析用户意图并提取参数以调用工具。\n"
"当前模式:{tool_type}\n\n"
"重要规则:\n"
"1. 必须使用系统提供的工具来响应用户请求。\n"
"2. 【中文城市名处理】:如果用户问的是中文城市名(如兰州、上海),"
"必须**逐字复制**用户问题中的城市名作为参数,严禁翻译、拼音化或猜测。\n"
"3. 不要输出任何解释性文字,直接调用工具即可。"
)
max_retries: int = 3
def __post_init__(self):
"""初始化后验证配置参数的有效性
Raises:
ValueError: 当配置参数不符合要求时抛出
"""
self._validate()
def _validate(self):
"""验证配置参数的有效性
Raises:
ValueError: 当temperature不在0-2范围内或timeout不大于0时
"""
if not (0 <= self.temperature <= 2):
raise ValueError("temperature 必须在 0-2 之间")
if self.timeout <= 0:
raise ValueError("timeout 必须大于 0")
if self.max_retries < 0:
raise ValueError("max_retries 不能为负数")
@dataclass
class ExecutorMetrics:
"""执行指标统计类
用于跟踪和统计执行Agent的性能指标,包括执行次数、成功率、工具调用分布等
"""
total_executions: int = 0
successful_executions: int = 0
failed_executions: int = 0
timeout_count: int = 0
tool_calls_distribution: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
avg_latency_ms: float = 0.0
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息
Returns:
包含执行统计、成功率、工具调用分布和平均延迟的字典
"""
success_rate = (
self.successful_executions / self.total_executions * 100
if self.total_executions > 0 else 0
)
return {
"total_executions": self.total_executions,
"successful_executions": self.successful_executions,
"failed_executions": self.failed_executions,
"timeout_count": self.timeout_count,
"success_rate": round(success_rate, 2),
"tool_calls_distribution": dict(self.tool_calls_distribution),
"avg_latency_ms": round(self.avg_latency_ms, 2)
}
class ExecutorAgent:
"""执行Agent:提取参数并调用指定工具
不依赖具体的LLM实现,通过协议接口与LLM客户端交互
支持重试机制、超时控制、历史消息处理和性能指标统计
Attributes:
client: 实现了LLMClientProtocol的客户端实例
config: 执行配置对象
metrics: 性能指标统计对象
"""
def __init__(self, client: LLMClientProtocol, config: ExecutorConfig):
"""初始化执行Agent
Args:
client: 实现了LLMClientProtocol的客户端实例
config: 执行配置对象
"""
self.client = client
self.config = config
self.metrics = ExecutorMetrics()
logger.info(f"ExecutorAgent 初始化完成 - model={config.model}, max_retries={config.max_retries}")
async def __aenter__(self):
"""异步上下文管理器入口
Returns:
返回自身实例,支持async with语法
"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口
Args:
exc_type: 异常类型
exc_val: 异常值
exc_tb: 异常追踪
"""
await self.close()
def _get_allowed_tools(self, tool_type: str) -> List[Dict]:
"""根据工具类型获取允许使用的工具列表
Args:
tool_type: 工具类型 (WEATHER/CALCULATOR)
Returns:
匹配的工具列表
"""
from app.ai_agent.agent_tools import TOOLS
tool_mapping = {
"WEATHER": "tool_get_weather",
"CALCULATOR": "tool_calculator"
}
tool_name = tool_mapping.get(tool_type)
if not tool_name:
logger.warning(f"未知的工具类型: {tool_type}")
return []
allowed_tools = [t for t in TOOLS if t["function"]["name"] == tool_name]
logger.debug(f"工具类型 {tool_type} -> 允许工具: {[t['function']['name'] for t in allowed_tools]}")
return allowed_tools
def _build_messages(
self,
user_message: str,
tool_type: str,
history_msgs: Optional[List[Dict[str, str]]]
) -> List[Dict[str, str]]:
"""构建发送给LLM的消息列表
Args:
user_message: 用户输入的消息
tool_type: 工具类型
history_msgs: 历史对话消息
Returns:
构建好的消息列表
"""
system_content = self.config.executor_prompt_template.format(tool_type=tool_type)
if history_msgs:
messages = [{"role": "system", "content": system_content}] + history_msgs
logger.debug(f"使用历史消息,消息数: {len(history_msgs)}")
else:
messages = [
{"role": "system", "content": system_content},
{"role": "user", "content": user_message}
]
return messages
async def execute(
self,
user_message: str,
tool_type: str,
history_msgs: Optional[List[Dict[str, str]]] = None
) -> Any:
"""提取参数并执行工具调用
Args:
user_message: 用户输入的消息
tool_type: 工具类型 (WEATHER/CALCULATOR)
history_msgs: 历史对话消息(用于修正参数)
Returns:
ChatResponse: 包含工具调用信息的响应对象
Raises:
RuntimeError: 当重试次数耗尽时抛出
"""
self.metrics.total_executions += 1
start_time = perf_counter()
logger.info(f"🔧 开始执行 - tool_type={tool_type}, message={user_message[:50]}...")
allowed_tools = self._get_allowed_tools(tool_type)
if not allowed_tools:
self.metrics.failed_executions += 1
logger.error(f"❌ 未找到匹配的工具: {tool_type}")
raise ValueError(f"未知的工具类型: {tool_type}")
messages = self._build_messages(user_message, tool_type, history_msgs)
# 带重试机制的执行
last_error = None
for attempt in range(self.config.max_retries):
try:
response = await asyncio.wait_for(
self.client.chat(messages=messages, tools=allowed_tools),
timeout=self.config.timeout
)
# 记录工具调用
tool_calls = getattr(response, 'tool_calls', [])
if tool_calls:
for tool_call in tool_calls:
tool_name = getattr(tool_call, 'function', {}).get('name', 'unknown')
self.metrics.tool_calls_distribution[tool_name] += 1
self.metrics.successful_executions += 1
self._update_latency(start_time)
logger.info(f"✅ 执行成功 - tool_type={tool_type}, attempts={attempt + 1}")
return response
except asyncio.TimeoutError:
self.metrics.timeout_count += 1
last_error = f"执行超时 ({self.config.timeout}s)"
logger.error(f"❌ {last_error}, 尝试 {attempt + 1}/{self.config.max_retries}")
except Exception as e:
last_error = str(e)
logger.error(f"❌ 执行失败: {e}, 尝试 {attempt + 1}/{self.config.max_retries}", exc_info=True)
# 所有重试都失败
self.metrics.failed_executions += 1
logger.error(f"❌ 执行失败,重试次数耗尽: {last_error}")
raise RuntimeError(f"执行失败: {last_error}")
def _update_latency(self, start_time: float):
"""更新延迟指标
Args:
start_time: 开始时间戳
"""
latency = (perf_counter() - start_time) * 1000
self.metrics.avg_latency_ms = (
(self.metrics.avg_latency_ms * (self.metrics.total_executions - 1) + latency)
/ self.metrics.total_executions
)
def get_metrics(self) -> Dict[str, Any]:
"""获取性能指标统计
Returns:
包含各种性能指标的字典
"""
return self.metrics.get_stats()
async def close(self):
"""清理资源并关闭Agent
Note:
会关闭客户端连接
"""
if hasattr(self.client, 'close'):
await self.client.close()
logger.info("ExecutorAgent 已关闭")
def create_executor_agent(
base_url: str = "http://localhost:11434",
config: Optional[ExecutorConfig] = None
) -> ExecutorAgent:
"""创建配置好的ExecutorAgent实例
Args:
base_url: Ollama服务的基础URL
config: 可选的配置对象,未提供时使用默认配置
Returns:
配置完成的ExecutorAgent实例
Examples:
>>> agent = create_executor_agent()
>>> response = await agent.execute("兰州天气", "WEATHER")
>>> print(response.tool_calls)
使用自定义配置:
>>> config = ExecutorConfig(model="llama3.1:70b", max_retries=5)
>>> agent = create_executor_agent(config=config)
"""
from app.core.pers_ollama import OllamaClient
final_config = config or ExecutorConfig()
client = OllamaClient(
base_url=base_url,
model=final_config.model,
temperature=final_config.temperature
)
return ExecutorAgent(client=client, config=final_config)
- 单元测试代码
"""
agent_executor 独立使用示例
本模块演示如何单独使用 ExecutorAgent 进行工具参数提取和调用。
不依赖 Router,直接指定工具类型。
"""
import asyncio
import json
import logging
#from app.core.pers_logging import setup_logging
#setup_logging()
from app.ai_agent.agent_executor import create_executor_agent
from app.ai_agent.agent_tools import tool_get_weather, tool_calculator
def parse_tool_call_info(response) -> tuple:
"""
安全地从响应中提取工具调用信息
兼容字典和 ToolCall 对象两种返回格式
"""
# 1. 获取 tool_calls 列表
tool_calls = getattr(response, 'tool_calls', None)
# 如果是字典,使用 get 获取
if tool_calls is None and isinstance(response, dict):
tool_calls = response.get('tool_calls')
if not tool_calls or not isinstance(tool_calls, list):
# DEBUG: 打印原始响应以便调试
logging.warning(f"未找到 tool_calls,原始响应类型: {type(response)}")
if hasattr(response, 'content'):
logging.warning(f"响应内容: {response.content}")
return None, None
first_tool = tool_calls[0]
# 2. 解析 function 信息
func_name = None
func_args = None
if isinstance(first_tool, dict):
# 处理字典格式
func_data = first_tool.get('function', {})
func_name = func_data.get('name')
func_args = func_data.get('arguments')
else:
# 处理 ToolCall 对象格式 (注意:ToolCall.function 是一个 Dict)
func_data = getattr(first_tool, 'function', None)
if func_data and isinstance(func_data, dict):
func_name = func_data.get('name')
func_args = func_data.get('arguments')
elif func_data:
# 兜底:如果 func_data 不是字典而是对象(极少见)
func_name = getattr(func_data, 'name', None)
func_args = getattr(func_data, 'arguments', None)
return func_name, func_args
async def run_tool_execution(tool_name: str, arguments) -> str:
"""执行工具调用"""
try:
# 参数解析兼容处理
if isinstance(arguments, str):
args_dict = json.loads(arguments)
elif isinstance(arguments, dict):
args_dict = arguments
else:
return f"无法解析的参数类型: {type(arguments)}"
logging.info(f"🔧 执行工具: {tool_name}, 参数: {args_dict}")
if tool_name == "tool_get_weather":
return tool_get_weather(**args_dict)
elif tool_name == "tool_calculator":
return tool_calculator(**args_dict)
else:
return f"未知的工具: {tool_name}"
except json.JSONDecodeError:
error_msg = f"参数解析失败: {arguments}"
logging.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"工具执行异常: {str(e)}"
logging.error(error_msg, exc_info=True)
return error_msg
async def example_weather_query():
"""示例1:天气查询"""
print("=" * 50)
print("示例1:天气查询")
print("=" * 50 + "\n")
async with create_executor_agent() as executor:
user_query = "兰州今天天气怎么样?"
print(f"👤 用户: {user_query}")
print(f"🎯 指定工具类型: WEATHER\n")
# 直接指定工具类型
response = await executor.execute(user_query, tool_type="WEATHER")
# 提取工具信息
func_name, func_args = parse_tool_call_info(response)
if func_name and func_args:
print(f"🛠️ 识别工具: {func_name}")
print(f"📝 提取参数: {func_args}\n")
# 执行工具
result = await run_tool_execution(func_name, func_args)
print(f"✅ 执行结果:\n{result}")
else:
content = getattr(response, 'content', None) if not isinstance(response, dict) else response.get('content')
print(f"💬 LLM回复: {content}")
print("\n" + "-" * 50 + "\n")
async def example_calculator():
"""示例2:计算器"""
print("=" * 50)
print("示例2:计算器")
print("=" * 50 + "\n")
async with create_executor_agent() as executor:
user_query = "计算 (123 + 456) * 2 的结果"
print(f"👤 用户: {user_query}")
print(f"🎯 指定工具类型: CALCULATOR\n")
# 直接指定工具类型
response = await executor.execute(user_query, tool_type="CALCULATOR")
# 提取工具信息
func_name, func_args = parse_tool_call_info(response)
if func_name and func_args:
print(f"🛠️ 识别工具: {func_name}")
print(f"📝 提取参数: {func_args}\n")
# 执行工具
result = await run_tool_execution(func_name, func_args)
print(f"✅ 执行结果:\n{result}")
else:
content = getattr(response, 'content', None) if not isinstance(response, dict) else response.get('content')
print(f"💬 LLM回复: {content}")
print("\n" + "-" * 50 + "\n")
async def example_batch_processing():
"""示例3:批量处理"""
print("=" * 50)
print("示例3:批量处理")
print("=" * 50 + "\n")
async with create_executor_agent() as executor:
# 定义多个任务,每个任务包含查询和工具类型
tasks = [
("上海冷不冷?", "WEATHER"),
("100 除以 4 等于多少", "CALCULATOR"),
("深圳今天天气如何", "WEATHER"),
("计算 23 * 59 + 100", "CALCULATOR")
]
results = []
for user_query, tool_type in tasks:
print(f"👤 用户: {user_query}")
print(f"🎯 工具类型: {tool_type}")
try:
response = await executor.execute(user_query, tool_type=tool_type)
func_name, func_args = parse_tool_call_info(response)
if func_name and func_args:
result = await run_tool_execution(func_name, func_args)
results.append((user_query, result))
print(f"✅ 结果: {result[:60]}{'...' if len(result) > 60 else ''}")
else:
results.append((user_query, "未提取到工具调用"))
print(f"⚠️ 未提取到工具调用")
except Exception as e:
results.append((user_query, f"错误: {str(e)}"))
print(f"❌ 处理失败: {e}")
print()
# 汇总结果
print("=" * 50)
print("📋 执行汇总")
print("=" * 50)
for query, result in results:
print(f"• {query}")
print(f" {result}")
print()
print("-" * 50 + "\n")
async def example_with_metrics():
"""示例4:获取性能指标"""
print("=" * 50)
print("示例4:性能指标监控")
print("=" * 50 + "\n")
async with create_executor_agent() as executor:
# 执行多个任务
test_cases = [
("北京天气", "WEATHER"),
("1+1", "CALCULATOR"),
("天津热吗", "WEATHER"),
("2*3*4", "CALCULATOR"),
]
print("执行测试任务...\n")
for query, tool_type in test_cases:
try:
await executor.execute(query, tool_type=tool_type)
except Exception as e:
logging.error(f"任务失败: {e}")
# 获取性能指标
print("=" * 50)
print("📊 性能统计")
print("=" * 50)
stats = executor.get_metrics()
print(f"总执行次数: {stats['total_executions']}")
print(f"成功次数: {stats['successful_executions']}")
print(f"失败次数: {stats['failed_executions']}")
print(f"成功率: {stats['success_rate']}%")
print(f"平均延迟: {stats['avg_latency_ms']:.2f} ms")
print(f"工具调用分布:")
for tool, count in stats['tool_calls_distribution'].items():
print(f" - {tool}: {count} 次")
print("\n" + "-" * 50 + "\n")
async def example_custom_config():
"""示例5:自定义配置"""
print("=" * 50)
print("示例5:自定义配置")
print("=" * 50 + "\n")
from app.ai_agent.agent_executor import ExecutorConfig
# 自定义配置
config = ExecutorConfig(
model="llama3.1:8b",
temperature=0.1, # 降低温度以获得更稳定的参数提取
timeout=15.0, # 超时时间
max_retries=2 # 重试次数
)
print(f"配置信息:")
print(f" - Model: {config.model}")
print(f" - Temperature: {config.temperature}")
print(f" - Timeout: {config.timeout}s")
print(f" - Max Retries: {config.max_retries}\n")
async with create_executor_agent(config=config) as executor:
user_query = "计算 999 * 888 的结果"
print(f"👤 用户: {user_query}\n")
response = await executor.execute(user_query, tool_type="CALCULATOR")
func_name, func_args = parse_tool_call_info(response)
if func_name and func_args:
result = await run_tool_execution(func_name, func_args)
print(f"✅ 执行结果: {result}")
print("\n" + "-" * 50 + "\n")
async def main():
"""运行所有示例"""
print("\n" + "🛠️" * 25)
print("ExecutorAgent 独立使用示例集合")
print("🛠️" * 25 + "\n")
try:
await example_weather_query()
await asyncio.sleep(0.5)
await example_calculator()
await asyncio.sleep(0.5)
await example_batch_processing()
await asyncio.sleep(0.5)
await example_with_metrics()
await asyncio.sleep(0.5)
await example_custom_config()
except Exception as e:
logging.error(f"程序运行出错: {e}", exc_info=True)
if __name__ == "__main__":
asyncio.run(main())
- 执行结果
🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️
ExecutorAgent 独立使用示例集合
🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️🛠️
==================================================
示例1:天气查询
==================================================
👤 用户: 兰州今天天气怎么样?
🎯 指定工具类型: WEATHER
🛠️ 识别工具: tool_get_weather
📝 提取参数: {'city': '勘江'}
未找到城市: 勘江
✅ 执行结果:
无法获取天气信息
--------------------------------------------------
==================================================
示例2:计算器
==================================================
👤 用户: 计算 (123 + 456) * 2 的结果
🎯 指定工具类型: CALCULATOR
🛠️ 识别工具: tool_calculator
📝 提取参数: {'expression': '(123 + 456) * 2'}
✅ 执行结果:
1158
--------------------------------------------------
==================================================
示例3:批量处理
==================================================
👤 用户: 上海冷不冷?
🎯 工具类型: WEATHER
✅ 结果: 城市: 北京
温度: 3.4°C
体感温度: 0.9°C
湿度: 80%
风速: 2.9 km/h
天气: 大部分晴朗
👤 用户: 100 除以 4 等于多少
🎯 工具类型: CALCULATOR
❌ 处理失败: 'float' object is not subscriptable
👤 用户: 深圳今天天气如何
🎯 工具类型: WEATHER
未找到城市: 深湟
✅ 结果: 无法获取天气信息
👤 用户: 计算 23 * 59 + 100
🎯 工具类型: CALCULATOR
❌ 处理失败: 'int' object is not subscriptable
==================================================
📋 执行汇总
==================================================
• 上海冷不冷?
城市: 北京
温度: 3.4°C
体感温度: 0.9°C
湿度: 80%
风速: 2.9 km/h
天气: 大部分晴朗
• 100 除以 4 等于多少
25.0
• 100 除以 4 等于多少
错误: 'float' object is not subscriptable
• 深圳今天天气如何
无法获取天气信息
• 计算 23 * 59 + 100
1457
• 计算 23 * 59 + 100
错误: 'int' object is not subscriptable
--------------------------------------------------
==================================================
示例4:性能指标监控
==================================================
执行测试任务...
==================================================
📊 性能统计
==================================================
总执行次数: 4
成功次数: 4
失败次数: 0
成功率: 100.0%
平均延迟: 2359.89 ms
工具调用分布:
- tool_get_weather: 2 次
- tool_calculator: 2 次
--------------------------------------------------
==================================================
示例5:自定义配置
==================================================
配置信息:
- Model: llama3.1:8b
- Temperature: 0.1
- Timeout: 15.0s
- Max Retries: 2
👤 用户: 计算 999 * 888 的结果
✅ 执行结果: 887112
--------------------------------------------------
llama3.1:8b 的模型在初步分析的时候没有达到预期效果,后期需配合 validator 纠正模型执行结果。
以上便是本文的全部内容,感谢您的阅读,如遇到任何问题,欢迎在评论区留言讨论。