背景简介
Responder Agent 是 Agentic AI 系列课程中的重要一环,专注于将工具执行结果转化为自然、友好的用户回答。作为整个智能 Agent 流程的输出端,它负责把复杂的工具调用结果包装成用户易于理解的语言,是提升用户体验的关键组件。
前置信息
- 完成 【Agentic AI - 第一课 - 从零实现基于 ReAct 范式的智能 Agent】
- 完成 【Agentic AI - 第二课 - 多Agent协同】
- 完成 【Agentic AI - 第三课 - 多Agent协同模块化】
- 完成 【Agentic AI - 第四课 - 优化 Router Agent 实现高效意图识别】
- 完成 【Agentic AI - 第五课 - 优化 Executor Agent】
- 完成 【Agentic AI - 第六课 - 优化 Validator Agent】
详细信息
核心功能概述
Responder Agent 的主要职责是根据用户问题和工具执行结果,生成自然、流畅的中文回答。它支持流式和非流式两种输出方式,具备完善的性能监控和异步处理能力。
代码架构解析
- 协议定义:系统通过 LLMClientProtocol 协议定义了与大语言模型交互的标准接口,确保了不同 LLM 客户端的兼容性。
- 配置管理:ResponderConfig 类提供了灵活的配置选项,包括模型选择、温度参数、超时设置和提示模板等。
- 性能监控:ResponderMetrics 类负责收集和统计响应生成的性能指标,包括调用次数、平均延迟和 token 使用量,帮助开发者优化系统性能。
代码样例
优化后 Responder 代码
"""响应Agent模块
提供基于大语言模型的自然语言响应生成功能,用于根据工具结果生成友好的用户回答。
主要功能包括响应生成、性能监控、异步处理和流式输出。
"""
import logging
from typing import Protocol, List, Dict, Any, Optional, runtime_checkable, AsyncIterator
from dataclasses import dataclass, field
from collections import defaultdict
from time import perf_counter
logger = logging.getLogger(__name__)
@runtime_checkable
class LLMClientProtocol(Protocol):
"""大语言模型客户端协议定义
定义了与大语言模型交互的标准接口,支持同步和异步对话功能。
"""
async def chat(
self,
messages: List[Dict[str, str]],
tools: List[Dict],
stream: bool = False
) -> Any:
"""与LLM进行对话交互
Args:
messages: 对话消息列表,包含角色和内容
tools: 可用工具列表
stream: 是否启用流式响应
Returns:
LLM的响应结果,可能是字符串或流式迭代器
"""
...
@dataclass
class ResponderConfig:
"""响应Agent配置类
包含响应生成所需的各种配置参数,包括模型设置、提示模板和性能选项。
"""
model: str = "llama3.1:8b"
temperature: float = 0.7
timeout: float = 60.0
response_prompt_template: str = (
"你是一个友好的助手。请根据工具结果回答用户的问题。\n"
"用户问题:{user_message}\n"
"工具返回结果:{tool_result}\n\n"
"请用自然、口语化的中文回答。"
)
default_response: str = "抱歉,我无法生成回答。"
enable_metrics: bool = True
def __post_init__(self):
"""初始化后验证配置参数的有效性"""
self._validate()
def _validate(self):
"""验证配置参数的有效性
Raises:
ValueError: 当参数不在有效范围内时抛出
"""
if not (0 <= self.temperature <= 2):
raise ValueError("temperature 必须在 0-2 之间")
if self.timeout <= 0:
raise ValueError("timeout 必须大于 0")
@dataclass
class ResponderMetrics:
"""响应指标统计类
用于收集和统计响应生成的性能指标,包括调用次数、延迟和token使用量。
"""
total_calls: int = 0
avg_latency_ms: float = 0.0
total_tokens: int = 0
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息
Returns:
包含所有性能指标的字典
"""
return {
"total_calls": self.total_calls,
"avg_latency_ms": round(self.avg_latency_ms, 2),
"total_tokens": self.total_tokens
}
class ResponderAgent:
"""响应Agent:负责根据工具结果生成友好的自然语言回答
提供流式和非流式两种响应生成方式,支持性能监控和异步处理。
"""
def __init__(self, client: LLMClientProtocol, config: ResponderConfig):
"""初始化响应Agent
Args:
client: 大语言模型客户端实例
config: 响应配置参数
"""
self.client = client
self.config = config
self.metrics = ResponderMetrics()
logger.info(f"ResponderAgent 初始化完成 - model={config.model}")
async def __aenter__(self):
"""异步上下文管理器入口
Returns:
自身实例,支持 async with 语法
"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口
Args:
exc_type: 异常类型
exc_val: 异常值
exc_tb: 异常追踪信息
"""
await self.close()
async def generate_response_stream(
self,
user_message: str,
tool_result: str
) -> AsyncIterator[str]:
"""生成流式回答
根据用户消息和工具结果,通过LLM生成流式的自然语言回答。
支持性能监控和超时处理。
Args:
user_message: 用户原始消息
tool_result: 工具执行结果
Yields:
流式生成的纯文本内容片段
Examples:
>>> async for chunk in agent.generate_response_stream("你好", "工具结果"):
... print(chunk, end="")
你好!我是你的助手...
"""
self.metrics.total_calls += 1
start_time = perf_counter()
logger.info(f"📝 开始生成响应: {user_message[:50]}...")
try:
import asyncio
prompt = self.config.response_prompt_template.format(
user_message=user_message,
tool_result=tool_result
)
stream = await asyncio.wait_for(
self.client.chat(
messages=[{"role": "user", "content": prompt}],
tools=[],
stream=True
),
timeout=self.config.timeout
)
async for chunk in stream:
# 提取 content 内容(兼容多种 chunk 格式)
content = self._extract_content(chunk)
# 跳过空内容
if not content:
continue
# 更新 token 统计
if self.config.enable_metrics:
tokens = getattr(chunk, 'eval_count', 0)
if tokens:
self.metrics.total_tokens += tokens
yield content
self._update_latency(start_time)
logger.info("✅ 响应生成完成")
except asyncio.TimeoutError:
logger.error(f"❌ 响应生成超时 ({self.config.timeout}s)")
yield self.config.default_response
except Exception as e:
logger.error(f"❌ 响应生成失败: {e}", exc_info=True)
yield self.config.default_response
def _extract_content(self, chunk: Any) -> str:
"""从 chunk 中提取文本内容
支持多种响应格式,包括StreamChunk对象、字符串和字典。
Args:
chunk: 流式响应块,可能是 StreamChunk 对象、字符串或字典
Returns:
提取的文本内容
"""
if hasattr(chunk, 'content'):
# StreamChunk 对象
content = chunk.content
elif isinstance(chunk, str):
# 纯字符串
content = chunk
elif isinstance(chunk, dict):
# 字典格式
content = chunk.get('content', '')
else:
# 其他情况转为字符串
content = str(chunk)
return content
async def generate_response(self, user_message: str, tool_result: str) -> str:
"""生成完整回答(非流式)
收集所有流式响应片段并组合成完整的回答文本。
Args:
user_message: 用户原始消息
tool_result: 工具执行结果
Returns:
完整的回答内容
"""
response_parts = []
async for chunk in self.generate_response_stream(user_message, tool_result):
response_parts.append(chunk)
return "".join(response_parts)
def _update_latency(self, start_time: float):
"""更新延迟指标
计算响应延迟并更新平均延迟统计。
Args:
start_time: 响应开始时间戳
"""
if not self.config.enable_metrics:
return
latency = (perf_counter() - start_time) * 1000
self.metrics.avg_latency_ms = (
(self.metrics.avg_latency_ms * (self.metrics.total_calls - 1) + latency)
/ self.metrics.total_calls
)
def get_metrics(self) -> Dict[str, Any]:
"""获取性能指标统计
Returns:
包含性能指标的字典,如果禁用指标则返回提示信息
"""
if not self.config.enable_metrics:
return {"metrics_disabled": True}
return self.metrics.get_stats()
async def close(self):
"""清理资源并关闭Agent
关闭底层客户端连接并记录日志。
"""
if hasattr(self.client, 'close'):
await self.client.close()
logger.info("ResponderAgent 已关闭")
def create_responder_agent(
base_url: str = "http://localhost:11434",
config: Optional[ResponderConfig] = None
) -> ResponderAgent:
"""创建配置好的ResponderAgent实例
工厂函数,用于快速创建配置完整的响应Agent实例。
Args:
base_url: Ollama服务的基础URL
config: 响应配置,如果为None则使用默认配置
Returns:
配置完成的ResponderAgent实例
Examples:
>>> agent = create_responder_agent()
>>> response = await agent.generate_response("你好", "工具结果")
"""
from app.core.pers_ollama import OllamaClient
final_config = config or ResponderConfig()
client = OllamaClient(
base_url=base_url,
model=final_config.model,
temperature=final_config.temperature
)
return ResponderAgent(client=client, config=final_config)
执行样例代码
"""
agent_responder 独立使用示例
本模块演示如何单独使用 ResponderAgent 将工具执行结果转化为友好的自然语言回复。
不依赖 Router,直接模拟工具返回的结果。
"""
import asyncio
import logging
# from app.core.pers_logging import setup_logging
# setup_logging()
from app.ai_agent.agent_responder import create_responder_agent, ResponderConfig
# 模拟工具返回结果的辅助数据
MOCK_TOOL_RESULTS = {
"weather": "城市:兰州,日期:2026-01-01,天气:多云转晴,气温:-5°C 到 5°C,风力:3级。",
"calc": "56088",
"error": "错误:无法连接到天气服务接口。"
}
async def example_streaming_response():
"""示例1:流式响应(天气查询场景)"""
print("=" * 50)
print("示例1:流式响应(天气查询)")
print("=" * 50 + "\n")
async with create_responder_agent() as responder:
user_query = "兰州今天天气怎么样?"
tool_result = MOCK_TOOL_RESULTS["weather"]
print(f"👤 用户问题: {user_query}")
print(f"🛠️ 工具结果: {tool_result}\n")
print(f"🤖 AI回复 (流式): ", end="", flush=True)
try:
# 流式逐字输出
async for chunk in responder.generate_response_stream(user_query, tool_result):
print(chunk, end="", flush=True)
print("\n") # 换行
except Exception as e:
print(f"\n❌ 生成失败: {e}")
print("-" * 50 + "\n")
async def example_non_streaming_response():
"""示例2:非流式响应(计算器场景)"""
print("=" * 50)
print("示例2:非流式响应(计算器)")
print("=" * 50 + "\n")
async with create_responder_agent() as responder:
user_query = "帮我算一下 123 乘以 456 是多少?"
tool_result = MOCK_TOOL_RESULTS["calc"]
print(f"👤 用户问题: {user_query}")
print(f"🛠️ 工具结果: {tool_result}\n")
try:
# 直接获取完整回复
response_text = await responder.generate_response(user_query, tool_result)
print(f"🤖 AI回复: {response_text}")
except Exception as e:
print(f"❌ 生成失败: {e}")
print("-" * 50 + "\n")
async def example_batch_processing():
"""示例3:批量处理多种场景"""
print("=" * 50)
print("示例3:批量处理")
print("=" * 50 + "\n")
# 定义测试数据集:(用户问题, 模拟的工具结果)
test_cases = [
("上海冷不冷?", "城市:上海,气温:18°C,天气:小雨,体感较冷。"),
("100 除以 4 等于多少", "25"),
("刚才的计算报错了吗?", MOCK_TOOL_RESULTS["error"]),
("深圳今天有雨吗", "城市:深圳,气温:24°C,天气:晴朗,无降水。")
]
async with create_responder_agent() as responder:
for query, result in test_cases:
print(f"👤 问题: {query}")
print(f"📄 原始结果: {result}")
try:
# 这里使用非流式方式以便整齐展示,实际生产中可用流式
reply = await responder.generate_response(query, result)
print(f"💬 回复: {reply[:80]}{'...' if len(reply) > 80 else ''}")
except Exception as e:
print(f"❌ 失败: {e}")
print()
print("-" * 50 + "\n")
async def example_with_metrics():
"""示例4:获取性能指标"""
print("=" * 50)
print("示例4:性能指标监控")
print("=" * 50 + "\n")
async with create_responder_agent() as responder:
# 执行一批任务以产生数据
queries = [
("北京天气如何?", MOCK_TOOL_RESULTS["weather"]),
("1+1", "2"),
("天津热吗", "35°C,很热。"),
("2*3*4", "24"),
("再见", "再见") # 这里没有工具结果,也可以测试通用回复
]
print("正在执行测试任务以收集指标...\n")
for q, r in queries:
try:
await responder.generate_response(q, r)
except Exception as e:
logging.error(f"任务失败: {e}")
# 获取并展示指标
print("=" * 50)
print("📊 性能统计")
print("=" * 50)
stats = responder.get_metrics()
if "metrics_disabled" in stats:
print("指标统计已禁用")
else:
print(f"总回复生成次数: {stats['total_calls']}")
print(f"平均延迟: {stats['avg_latency_ms']} ms")
print(f"消耗Token总数: {stats['total_tokens']}")
print("\n" + "-" * 50 + "\n")
async def example_custom_config():
"""示例5:自定义配置(修改Prompt人设)"""
print("=" * 50)
print("示例5:自定义配置")
print("=" * 50 + "\n")
# 自定义配置:修改 Prompt 模板和温度
custom_config = ResponderConfig(
model="llama3.1:8b",
temperature=0.8, # 提高温度,让回答更活泼
response_prompt_template=(
"你是一个说话幽默、喜欢用表情包的助手。\n"
"请根据工具结果回答用户问题。\n"
"用户问题:{user_message}\n"
"工具返回结果:{tool_result}\n\n"
"请用简短、幽默的中文回答,必须包含至少一个emoji。"
)
)
print("当前配置:")
print(f" - Temperature: {custom_config.temperature}")
print(f" - Prompt: Humor/Emoji Mode\n")
async with create_responder_agent(config=custom_config) as responder:
user_query = "帮我算一下 99 + 1"
tool_result = "100"
print(f"👤 用户: {user_query}")
print(f"🛠️ 工具结果: {tool_result}\n")
try:
reply = await responder.generate_response(user_query, tool_result)
print(f"🤖 AI回复 (幽默模式): {reply}")
except Exception as e:
print(f"❌ 生成失败: {e}")
print("\n" + "-" * 50 + "\n")
async def main():
"""运行所有示例"""
print("\n" + "🗣️" * 25)
print("ResponderAgent 独立使用示例集合")
print("🗣️" * 25 + "\n")
try:
await example_streaming_response()
await asyncio.sleep(0.5)
await example_non_streaming_response()
await asyncio.sleep(0.5)
await example_batch_processing()
await asyncio.sleep(0.5)
await example_with_metrics()
await asyncio.sleep(0.5)
await example_custom_config()
except Exception as e:
logging.error(f"程序运行出错: {e}", exc_info=True)
if __name__ == "__main__":
# 如果需要调试日志,取消下面注释
# logging.basicConfig(level=logging.INFO)
asyncio.run(main())
执行结果
🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️
ResponderAgent 独立使用示例集合
🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️
==================================================
示例1:流式响应(天气查询)
==================================================
👤 用户问题: 兰州今天天气怎么样?
🛠️ 工具结果: 城市:兰州,日期:2026-01-01,天气:多云转晴,气温:-5°C 到 5°C,风力:3级。
🤖 AI回复 (流式): 兰州今天的天气应该是比较舒适的,温度在零下五度到五度之间,风力也不是很大,大概三级左右。早上可能还有点儿多云,但逐渐变成晴天了。
--------------------------------------------------
==================================================
示例2:非流式响应(计算器)
==================================================
👤 用户问题: 帮我算一下 123 乘以 456 是多少?
🛠️ 工具结果: 56088
🤖 AI回复: 乘法结果是:56,088
--------------------------------------------------
==================================================
示例3:批量处理
==================================================
👤 问题: 上海冷不冷?
📄 原始结果: 城市:上海,气温:18°C,天气:小雨,体感较冷。
💬 回复: 应该是有点 chilly 啦!今天上海的温度比较低,小雨也是有的,所以感觉会比较冷点。
👤 问题: 100 除以 4 等于多少
📄 原始结果: 25
💬 回复: 这道题正确答案是25,因为100除以4等于25。
👤 问题: 刚才的计算报错了吗?
📄 原始结果: 错误:无法连接到天气服务接口。
💬 回复: 是的,我遇到了问题。我试图获取天气信息,但是连接失败了。我们可以尝试重新启动或检查网络设置看看是否能解决这个问题。
👤 问题: 深圳今天有雨吗
📄 原始结果: 城市:深圳,气温:24°C,天气:晴朗,无降水。
💬 回复: 不会有雨的。
--------------------------------------------------
==================================================
示例4:性能指标监控
==================================================
正在执行测试任务以收集指标...
==================================================
📊 性能统计
==================================================
总回复生成次数: 5
平均延迟: 821.53 ms
消耗Token总数: 0
--------------------------------------------------
==================================================
示例5:自定义配置
==================================================
当前配置:
- Temperature: 0.8
- Prompt: Humor/Emoji Mode
👤 用户: 帮我算一下 99 + 1
🛠️ 工具结果: 100
🤖 AI回复 (幽默模式): 哈哈~ 算完了! 🤯 结果是 100 啦!
--------------------------------------------------
以上便是本文的全部内容,感谢您的阅读,如遇到任何问题,欢迎在评论区留言讨论。