Henry
发布于 2026-01-07 / 2 阅读
0
0

Agentic AI - 第七课 - 优化 Responder Agent

背景简介

Responder Agent 是 Agentic AI 系列课程中的重要一环,专注于将工具执行结果转化为自然、友好的用户回答。作为整个智能 Agent 流程的输出端,它负责把复杂的工具调用结果包装成用户易于理解的语言,是提升用户体验的关键组件。

前置信息

详细信息

核心功能概述

Responder Agent 的主要职责是根据用户问题和工具执行结果,生成自然、流畅的中文回答。它支持流式和非流式两种输出方式,具备完善的性能监控和异步处理能力。

代码架构解析

  • 协议定义:系统通过 LLMClientProtocol 协议定义了与大语言模型交互的标准接口,确保了不同 LLM 客户端的兼容性。
  • 配置管理:ResponderConfig 类提供了灵活的配置选项,包括模型选择、温度参数、超时设置和提示模板等。
  • 性能监控:ResponderMetrics 类负责收集和统计响应生成的性能指标,包括调用次数、平均延迟和 token 使用量,帮助开发者优化系统性能。

代码样例

优化后 Responder 代码

"""响应Agent模块
提供基于大语言模型的自然语言响应生成功能,用于根据工具结果生成友好的用户回答。
主要功能包括响应生成、性能监控、异步处理和流式输出。
"""
import logging
from typing import Protocol, List, Dict, Any, Optional, runtime_checkable, AsyncIterator
from dataclasses import dataclass, field
from collections import defaultdict
from time import perf_counter
logger = logging.getLogger(__name__)
@runtime_checkable
class LLMClientProtocol(Protocol):
    """大语言模型客户端协议定义
    
    定义了与大语言模型交互的标准接口,支持同步和异步对话功能。
    """
    
    async def chat(
        self, 
        messages: List[Dict[str, str]], 
        tools: List[Dict],
        stream: bool = False
    ) -> Any:
        """与LLM进行对话交互
        
        Args:
            messages: 对话消息列表,包含角色和内容
            tools: 可用工具列表
            stream: 是否启用流式响应
            
        Returns:
            LLM的响应结果,可能是字符串或流式迭代器
        """
        ...
@dataclass
class ResponderConfig:
    """响应Agent配置类
    
    包含响应生成所需的各种配置参数,包括模型设置、提示模板和性能选项。
    """
    
    model: str = "llama3.1:8b"
    temperature: float = 0.7
    timeout: float = 60.0
    response_prompt_template: str = (
        "你是一个友好的助手。请根据工具结果回答用户的问题。\n"
        "用户问题:{user_message}\n"
        "工具返回结果:{tool_result}\n\n"
        "请用自然、口语化的中文回答。"
    )
    default_response: str = "抱歉,我无法生成回答。"
    enable_metrics: bool = True
    
    def __post_init__(self):
        """初始化后验证配置参数的有效性"""
        self._validate()
    
    def _validate(self):
        """验证配置参数的有效性
        
        Raises:
            ValueError: 当参数不在有效范围内时抛出
        """
        if not (0 <= self.temperature <= 2):
            raise ValueError("temperature 必须在 0-2 之间")
        if self.timeout <= 0:
            raise ValueError("timeout 必须大于 0")
@dataclass
class ResponderMetrics:
    """响应指标统计类
    
    用于收集和统计响应生成的性能指标,包括调用次数、延迟和token使用量。
    """
    
    total_calls: int = 0
    avg_latency_ms: float = 0.0
    total_tokens: int = 0
    
    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息
        
        Returns:
            包含所有性能指标的字典
        """
        return {
            "total_calls": self.total_calls,
            "avg_latency_ms": round(self.avg_latency_ms, 2),
            "total_tokens": self.total_tokens
        }
class ResponderAgent:
    """响应Agent:负责根据工具结果生成友好的自然语言回答
    
    提供流式和非流式两种响应生成方式,支持性能监控和异步处理。
    """
    
    def __init__(self, client: LLMClientProtocol, config: ResponderConfig):
        """初始化响应Agent
        
        Args:
            client: 大语言模型客户端实例
            config: 响应配置参数
        """
        self.client = client
        self.config = config
        self.metrics = ResponderMetrics()
        logger.info(f"ResponderAgent 初始化完成 - model={config.model}")
    
    async def __aenter__(self):
        """异步上下文管理器入口
        
        Returns:
            自身实例,支持 async with 语法
        """
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口
        
        Args:
            exc_type: 异常类型
            exc_val: 异常值
            exc_tb: 异常追踪信息
        """
        await self.close()
    
    async def generate_response_stream(
        self, 
        user_message: str, 
        tool_result: str
    ) -> AsyncIterator[str]:
        """生成流式回答
        
        根据用户消息和工具结果,通过LLM生成流式的自然语言回答。
        支持性能监控和超时处理。
        
        Args:
            user_message: 用户原始消息
            tool_result: 工具执行结果
            
        Yields:
            流式生成的纯文本内容片段
            
        Examples:
            >>> async for chunk in agent.generate_response_stream("你好", "工具结果"):
            ...     print(chunk, end="")
            你好!我是你的助手...
        """
        self.metrics.total_calls += 1
        start_time = perf_counter()
        
        logger.info(f"📝 开始生成响应: {user_message[:50]}...")
        
        try:
            import asyncio
            
            prompt = self.config.response_prompt_template.format(
                user_message=user_message,
                tool_result=tool_result
            )
            
            stream = await asyncio.wait_for(
                self.client.chat(
                    messages=[{"role": "user", "content": prompt}], 
                    tools=[],
                    stream=True
                ),
                timeout=self.config.timeout
            )
            
            async for chunk in stream:
                # 提取 content 内容(兼容多种 chunk 格式)
                content = self._extract_content(chunk)
                
                # 跳过空内容
                if not content:
                    continue
                
                # 更新 token 统计
                if self.config.enable_metrics:
                    tokens = getattr(chunk, 'eval_count', 0)
                    if tokens:
                        self.metrics.total_tokens += tokens
                
                yield content
            
            self._update_latency(start_time)
            logger.info("✅ 响应生成完成")
            
        except asyncio.TimeoutError:
            logger.error(f"❌ 响应生成超时 ({self.config.timeout}s)")
            yield self.config.default_response
        except Exception as e:
            logger.error(f"❌ 响应生成失败: {e}", exc_info=True)
            yield self.config.default_response
    
    def _extract_content(self, chunk: Any) -> str:
        """从 chunk 中提取文本内容
        
        支持多种响应格式,包括StreamChunk对象、字符串和字典。
        
        Args:
            chunk: 流式响应块,可能是 StreamChunk 对象、字符串或字典
            
        Returns:
            提取的文本内容
        """
        if hasattr(chunk, 'content'):
            # StreamChunk 对象
            content = chunk.content
        elif isinstance(chunk, str):
            # 纯字符串
            content = chunk
        elif isinstance(chunk, dict):
            # 字典格式
            content = chunk.get('content', '')
        else:
            # 其他情况转为字符串
            content = str(chunk)
        
        return content
    
    async def generate_response(self, user_message: str, tool_result: str) -> str:
        """生成完整回答(非流式)
        
        收集所有流式响应片段并组合成完整的回答文本。
        
        Args:
            user_message: 用户原始消息
            tool_result: 工具执行结果
            
        Returns:
            完整的回答内容
        """
        response_parts = []
        async for chunk in self.generate_response_stream(user_message, tool_result):
            response_parts.append(chunk)
        
        return "".join(response_parts)
    
    def _update_latency(self, start_time: float):
        """更新延迟指标
        
        计算响应延迟并更新平均延迟统计。
        
        Args:
            start_time: 响应开始时间戳
        """
        if not self.config.enable_metrics:
            return
            
        latency = (perf_counter() - start_time) * 1000
        self.metrics.avg_latency_ms = (
            (self.metrics.avg_latency_ms * (self.metrics.total_calls - 1) + latency) 
            / self.metrics.total_calls
        )
    
    def get_metrics(self) -> Dict[str, Any]:
        """获取性能指标统计
        
        Returns:
            包含性能指标的字典,如果禁用指标则返回提示信息
        """
        if not self.config.enable_metrics:
            return {"metrics_disabled": True}
        return self.metrics.get_stats()
    
    async def close(self):
        """清理资源并关闭Agent
        
        关闭底层客户端连接并记录日志。
        """
        if hasattr(self.client, 'close'):
            await self.client.close()
        logger.info("ResponderAgent 已关闭")
def create_responder_agent(
    base_url: str = "http://localhost:11434",
    config: Optional[ResponderConfig] = None
) -> ResponderAgent:
    """创建配置好的ResponderAgent实例
    
    工厂函数,用于快速创建配置完整的响应Agent实例。
    
    Args:
        base_url: Ollama服务的基础URL
        config: 响应配置,如果为None则使用默认配置
        
    Returns:
        配置完成的ResponderAgent实例
        
    Examples:
        >>> agent = create_responder_agent()
        >>> response = await agent.generate_response("你好", "工具结果")
    """
    from app.core.pers_ollama import OllamaClient
    
    final_config = config or ResponderConfig()
    
    client = OllamaClient(
        base_url=base_url,
        model=final_config.model,
        temperature=final_config.temperature
    )
    
    return ResponderAgent(client=client, config=final_config)

执行样例代码

"""
agent_responder 独立使用示例
本模块演示如何单独使用 ResponderAgent 将工具执行结果转化为友好的自然语言回复。
不依赖 Router,直接模拟工具返回的结果。
"""

import asyncio
import logging

# from app.core.pers_logging import setup_logging
# setup_logging()

from app.ai_agent.agent_responder import create_responder_agent, ResponderConfig


# 模拟工具返回结果的辅助数据
MOCK_TOOL_RESULTS = {
    "weather": "城市:兰州,日期:2026-01-01,天气:多云转晴,气温:-5°C 到 5°C,风力:3级。",
    "calc": "56088",
    "error": "错误:无法连接到天气服务接口。"
}


async def example_streaming_response():
    """示例1:流式响应(天气查询场景)"""
    print("=" * 50)
    print("示例1:流式响应(天气查询)")
    print("=" * 50 + "\n")
    
    async with create_responder_agent() as responder:
        user_query = "兰州今天天气怎么样?"
        tool_result = MOCK_TOOL_RESULTS["weather"]
        
        print(f"👤 用户问题: {user_query}")
        print(f"🛠️  工具结果: {tool_result}\n")
        print(f"🤖 AI回复 (流式): ", end="", flush=True)
        
        try:
            # 流式逐字输出
            async for chunk in responder.generate_response_stream(user_query, tool_result):
                print(chunk, end="", flush=True)
            print("\n") # 换行
        except Exception as e:
            print(f"\n❌ 生成失败: {e}")
    
    print("-" * 50 + "\n")


async def example_non_streaming_response():
    """示例2:非流式响应(计算器场景)"""
    print("=" * 50)
    print("示例2:非流式响应(计算器)")
    print("=" * 50 + "\n")
    
    async with create_responder_agent() as responder:
        user_query = "帮我算一下 123 乘以 456 是多少?"
        tool_result = MOCK_TOOL_RESULTS["calc"]
        
        print(f"👤 用户问题: {user_query}")
        print(f"🛠️  工具结果: {tool_result}\n")
        
        try:
            # 直接获取完整回复
            response_text = await responder.generate_response(user_query, tool_result)
            print(f"🤖 AI回复: {response_text}")
        except Exception as e:
            print(f"❌ 生成失败: {e}")
    
    print("-" * 50 + "\n")


async def example_batch_processing():
    """示例3:批量处理多种场景"""
    print("=" * 50)
    print("示例3:批量处理")
    print("=" * 50 + "\n")
    
    # 定义测试数据集:(用户问题, 模拟的工具结果)
    test_cases = [
        ("上海冷不冷?", "城市:上海,气温:18°C,天气:小雨,体感较冷。"),
        ("100 除以 4 等于多少", "25"),
        ("刚才的计算报错了吗?", MOCK_TOOL_RESULTS["error"]),
        ("深圳今天有雨吗", "城市:深圳,气温:24°C,天气:晴朗,无降水。")
    ]
    
    async with create_responder_agent() as responder:
        for query, result in test_cases:
            print(f"👤 问题: {query}")
            print(f"📄 原始结果: {result}")
            
            try:
                # 这里使用非流式方式以便整齐展示,实际生产中可用流式
                reply = await responder.generate_response(query, result)
                print(f"💬 回复: {reply[:80]}{'...' if len(reply) > 80 else ''}")
            except Exception as e:
                print(f"❌ 失败: {e}")
            
            print()
    
    print("-" * 50 + "\n")


async def example_with_metrics():
    """示例4:获取性能指标"""
    print("=" * 50)
    print("示例4:性能指标监控")
    print("=" * 50 + "\n")
    
    async with create_responder_agent() as responder:
        # 执行一批任务以产生数据
        queries = [
            ("北京天气如何?", MOCK_TOOL_RESULTS["weather"]),
            ("1+1", "2"),
            ("天津热吗", "35°C,很热。"),
            ("2*3*4", "24"),
            ("再见", "再见") # 这里没有工具结果,也可以测试通用回复
        ]
        
        print("正在执行测试任务以收集指标...\n")
        for q, r in queries:
            try:
                await responder.generate_response(q, r)
            except Exception as e:
                logging.error(f"任务失败: {e}")
        
        # 获取并展示指标
        print("=" * 50)
        print("📊 性能统计")
        print("=" * 50)
        stats = responder.get_metrics()
        
        if "metrics_disabled" in stats:
            print("指标统计已禁用")
        else:
            print(f"总回复生成次数: {stats['total_calls']}")
            print(f"平均延迟: {stats['avg_latency_ms']} ms")
            print(f"消耗Token总数: {stats['total_tokens']}")
    
    print("\n" + "-" * 50 + "\n")


async def example_custom_config():
    """示例5:自定义配置(修改Prompt人设)"""
    print("=" * 50)
    print("示例5:自定义配置")
    print("=" * 50 + "\n")
    
    # 自定义配置:修改 Prompt 模板和温度
    custom_config = ResponderConfig(
        model="llama3.1:8b",
        temperature=0.8,          # 提高温度,让回答更活泼
        response_prompt_template=(
            "你是一个说话幽默、喜欢用表情包的助手。\n"
            "请根据工具结果回答用户问题。\n"
            "用户问题:{user_message}\n"
            "工具返回结果:{tool_result}\n\n"
            "请用简短、幽默的中文回答,必须包含至少一个emoji。"
        )
    )
    
    print("当前配置:")
    print(f"  - Temperature: {custom_config.temperature}")
    print(f"  - Prompt: Humor/Emoji Mode\n")
    
    async with create_responder_agent(config=custom_config) as responder:
        user_query = "帮我算一下 99 + 1"
        tool_result = "100"
        
        print(f"👤 用户: {user_query}")
        print(f"🛠️  工具结果: {tool_result}\n")
        
        try:
            reply = await responder.generate_response(user_query, tool_result)
            print(f"🤖 AI回复 (幽默模式): {reply}")
        except Exception as e:
            print(f"❌ 生成失败: {e}")
    
    print("\n" + "-" * 50 + "\n")


async def main():
    """运行所有示例"""
    print("\n" + "🗣️" * 25)
    print("ResponderAgent 独立使用示例集合")
    print("🗣️" * 25 + "\n")
    
    try:
        await example_streaming_response()
        await asyncio.sleep(0.5)
        
        await example_non_streaming_response()
        await asyncio.sleep(0.5)
        
        await example_batch_processing()
        await asyncio.sleep(0.5)
        
        await example_with_metrics()
        await asyncio.sleep(0.5)
        
        await example_custom_config()
        
    except Exception as e:
        logging.error(f"程序运行出错: {e}", exc_info=True)


if __name__ == "__main__":
    # 如果需要调试日志,取消下面注释
    # logging.basicConfig(level=logging.INFO)
    asyncio.run(main())

执行结果

🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️
ResponderAgent 独立使用示例集合
🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️🗣️

==================================================
示例1:流式响应(天气查询)
==================================================

👤 用户问题: 兰州今天天气怎么样?
🛠️  工具结果: 城市:兰州,日期:2026-01-01,天气:多云转晴,气温:-5°C 到 5°C,风力:3级。

🤖 AI回复 (流式): 兰州今天的天气应该是比较舒适的,温度在零下五度到五度之间,风力也不是很大,大概三级左右。早上可能还有点儿多云,但逐渐变成晴天了。

--------------------------------------------------

==================================================
示例2:非流式响应(计算器)
==================================================

👤 用户问题: 帮我算一下 123 乘以 456 是多少?
🛠️  工具结果: 56088

🤖 AI回复: 乘法结果是:56,088
--------------------------------------------------

==================================================
示例3:批量处理
==================================================

👤 问题: 上海冷不冷?
📄 原始结果: 城市:上海,气温:18°C,天气:小雨,体感较冷。
💬 回复: 应该是有点 chilly 啦!今天上海的温度比较低,小雨也是有的,所以感觉会比较冷点。

👤 问题: 100 除以 4 等于多少
📄 原始结果: 25
💬 回复: 这道题正确答案是25,因为100除以4等于25。

👤 问题: 刚才的计算报错了吗?
📄 原始结果: 错误:无法连接到天气服务接口。
💬 回复: 是的,我遇到了问题。我试图获取天气信息,但是连接失败了。我们可以尝试重新启动或检查网络设置看看是否能解决这个问题。

👤 问题: 深圳今天有雨吗
📄 原始结果: 城市:深圳,气温:24°C,天气:晴朗,无降水。
💬 回复: 不会有雨的。

--------------------------------------------------

==================================================
示例4:性能指标监控
==================================================

正在执行测试任务以收集指标...

==================================================
📊 性能统计
==================================================
总回复生成次数: 5
平均延迟: 821.53 ms
消耗Token总数: 0

--------------------------------------------------

==================================================
示例5:自定义配置
==================================================

当前配置:
  - Temperature: 0.8
  - Prompt: Humor/Emoji Mode

👤 用户: 帮我算一下 99 + 1
🛠️  工具结果: 100

🤖 AI回复 (幽默模式): 哈哈~ 算完了! 🤯 结果是 100 啦!

--------------------------------------------------

以上便是本文的全部内容,感谢您的阅读,如遇到任何问题,欢迎在评论区留言讨论。



评论