Henry
发布于 2026-01-06 / 7 阅读
0
0

Agentic AI - 第四课 - 优化 Router Agent 实现高效意图识别

背景简介

在智能 Agent 系统中,Router Agent 负责分析用户意图并分发到相应的处理模块,是整个系统的重要入口。本文将介绍如何优化 Router Agent,实现基于大语言模型的智能路由功能,包括意图识别、缓存机制、性能监控和异步处理等核心能力。

前置信息

详细信息

Router Agent 架构设计

优化后的 Router Agent 采用了模块化设计,通过协议接口与 LLM 客户端交互,实现了高内聚低耦合的架构。核心组件包括:

  • LLMClientProtocol:定义与大语言模型交互的标准接口
  • RouterConfig:路由配置类,包含模型参数、路由参数和缓存配置
  • RouterMetrics:性能指标统计类,用于跟踪路由Agent的性能表现
  • RouterAgent:核心路由类,实现意图分析和路由分发功能

优化后的 Router Agent 代码

# app/ai_agent/agent_router.py 
import logging
import asyncio
import time
from typing import Protocol, List, Dict, Any, Optional, runtime_checkable
from dataclasses import dataclass, field
from collections import defaultdict
from time import perf_counter
"""路由Agent模块
提供基于大语言模型的智能路由功能,用于分析用户意图并分发到相应的处理模块。
主要功能包括意图识别、缓存机制、性能监控和异步处理。
"""
@runtime_checkable
class LLMClientProtocol(Protocol):
    """大语言模型客户端协议定义
    
    定义了与大语言模型交互的标准接口,确保不同LLM实现的一致性
    """
    
    async def chat(self, messages: List[Dict[str, str]], tools: List[Dict]) -> Any:
        """与LLM进行对话交互
        
        Args:
            messages: 对话消息列表,包含角色和内容
            tools: 可用工具列表
            
        Returns:
            LLM的响应结果,格式取决于具体实现
        """
@dataclass
class RouterConfig:
    """路由Agent配置类
    
    包含LLM参数、路由参数和缓存配置等设置,用于控制路由Agent的行为
    """
    
    model: str = "llama3.1:8b"
    temperature: float = 0.2
    intent_prompt_template: str = (
        "意图分类(只输出模式名):\n"
        "- CHAT: 闲聊、常识\n"
        "- WEATHER: 查天气\n"
        "- CALCULATOR: 算数\n\n"
        "示例:\n"
        "用户: 月亮为什么圆 -> CHAT\n"
        "用户: 上海冷不冷 -> WEATHER\n"
        "用户: 123*456 -> CALCULATOR\n\n"
        "用户问题: {user_message}\n模式:"
    )
    intent_mapping: Dict[str, str] = field(default_factory=lambda: {
        "CALC": "CALCULATOR",
        "WEATHER": "WEATHER",
        "CHAT": "CHAT"
    })
    default_intent: str = "CHAT"
    timeout: float = 30.0
    enable_cache: bool = False
    cache_ttl: int = 300
    
    def __post_init__(self):
        """初始化后验证配置参数的有效性
        
        Raises:
            ValueError: 当配置参数不符合要求时抛出
        """
        self._validate()
    
    def _validate(self):
        """验证配置参数的有效性
        
        Raises:
            ValueError: 当temperature不在0-2范围内或timeout不大于0时
        """
        if not (0 <= self.temperature <= 2):
            raise ValueError("temperature 必须在 0-2 之间")
        if self.timeout <= 0:
            raise ValueError("timeout 必须大于 0")
@dataclass
class RouterMetrics:
    """路由指标统计类
    
    用于跟踪和统计路由Agent的性能指标,包括调用次数、缓存命中率等
    """
    
    total_calls: int = 0
    cache_hits: int = 0
    intent_distribution: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
    avg_latency_ms: float = 0.0
    
    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息
        
        Returns:
            包含总调用次数、缓存命中率、意图分布和平均延迟的字典
        """
class RouterAgent:
    """路由Agent:分析用户意图并路由到相应的处理模块
    
    不依赖具体的LLM实现,通过协议接口与LLM客户端交互
    支持缓存、超时控制和性能指标统计
    
    Attributes:
        client: 实现了LLMClientProtocol的客户端实例
        config: 路由配置对象
        metrics: 性能指标统计对象
        _cache: 内部缓存字典,存储消息到意图的映射
    """
    
    def __init__(self, client: LLMClientProtocol, config: RouterConfig):
        """初始化路由Agent
        
        Args:
            client: 实现了LLMClientProtocol的客户端实例
            config: 路由配置对象
        """
        self.client = client
        self.config = config
        self.metrics = RouterMetrics()
        self._cache: Dict[str, tuple[str, float]] = {}
        
        logging.info(f"RouterAgent 初始化完成 - model={config.model}")
    
    async def __aenter__(self):
        """异步上下文管理器入口
        
        Returns:
            返回自身实例,支持async with语法
        """
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口
        
        Args:
            exc_type: 异常类型
            exc_val: 异常值
            exc_tb: 异常追踪
        """
        await self.close()
    
    def _get_cached(self, key: str) -> Optional[str]:
        """获取缓存结果
        
        Args:
            key: 缓存键,通常为用户消息
            
        Returns:
            缓存的意图结果,如果不存在或已过期则返回None
        """
        if not self.config.enable_cache:
            return None
        cached = self._cache.get(key)
        if cached and time.time() - cached[1] < self.config.cache_ttl:
            self.metrics.cache_hits += 1
            logging.debug(f"缓存命中: {key}")
            return cached[0]
        return None
    
    def _set_cache(self, key: str, value: str):
        """设置缓存结果
        
        Args:
            key: 缓存键
            value: 要缓存的意图结果
        """
        if self.config.enable_cache:
            self._cache[key] = (value, time.time())
    
    async def classify(self, user_message: str) -> str:
        """分析用户消息的意图
        
        Args:
            user_message: 用户输入的消息
            
        Returns:
            识别出的意图类型
            
        Note:
            支持缓存机制以提高性能,失败时返回默认意图
            使用LLM进行意图分析,支持超时控制
        """
        self.metrics.total_calls += 1
        start_time = perf_counter()
        
        logging.info(f"🔍 开始分类: {user_message[:50]}...")
        
        cached = self._get_cached(user_message)
        if cached:
            self._update_latency(start_time)
            return cached
        
        try:
            prompt = self.config.intent_prompt_template.format(
                user_message=user_message
            )
            
            response = await asyncio.wait_for(
                self.client.chat(
                    messages=[{"role": "user", "content": prompt}],
                    tools=[]
                ),
                timeout=self.config.timeout
            )
            
            content = getattr(response, 'content', '').strip().upper()
            logging.debug(f"LLM 响应: {content}")
            
            intent = self._match_intent(content)
            
            self._set_cache(user_message, intent)
            
            logging.info(f"✅ 分类结果: {intent}")
            self._update_latency(start_time)
            
            return intent
            
        except asyncio.TimeoutError:
            logging.error(f"❌ 分类超时 ({self.config.timeout}s)")
            return self.config.default_intent
        except Exception as e:
            logging.error(f"❌ 分类失败: {e}", exc_info=True)
            return self.config.default_intent
    
    def _match_intent(self, content: str) -> str:
        """根据LLM响应内容匹配意图
        
        Args:
            content: LLM响应的内容
            
        Returns:
            匹配到的意图类型,未匹配时返回默认意图
        """
        for keyword, intent in self.config.intent_mapping.items():
            if keyword in content:
                return intent
        logging.warning(f"未匹配意图,使用默认: {self.config.default_intent}")
        return self.config.default_intent
    
    def _update_latency(self, start_time: float):
        """更新延迟指标
        
        Args:
            start_time: 开始时间戳
        """
        latency = (perf_counter() - start_time) * 1000
        self.metrics.avg_latency_ms = (
            (self.metrics.avg_latency_ms * (self.metrics.total_calls - 1) + latency) 
            / self.metrics.total_calls
        )
    
    def get_metrics(self) -> Dict[str, Any]:
        """获取性能指标统计
        
        Returns:
            包含各种性能指标的字典
        """
        return self.metrics.get_stats()
    
    async def close(self):
        """清理资源并关闭Agent
        
        Note:
            会关闭客户端连接并清空缓存
        """
        if hasattr(self.client, 'close'):
            await self.client.close()
        self._cache.clear()
        logging.info("RouterAgent 已关闭")
def create_router_agent(
    base_url: str = "http://localhost:11434",
    config: Optional[RouterConfig] = None
) -> RouterAgent:
    """创建配置好的RouterAgent实例
    
    Args:
        base_url: Ollama服务的基础URL
        config: 可选的配置对象,未提供时使用默认配置
        
    Returns:
        配置完成的RouterAgent实例
        
    Note:
        需要导入app.core.pers_ollama模块中的OllamaClient
        
    Examples:
        >>> agent = create_router_agent()
        >>> intent = await agent.classify("今天天气怎么样")
        >>> print(intent)
        WEATHER
    """
    from app.core.pers_ollama import OllamaClient
    
    final_config = config or RouterConfig()
    
    client = OllamaClient(
        base_url=base_url,
        model=final_config.model,
        temperature=final_config.temperature
    )
    
    return RouterAgent(client=client, config=final_config)

关键特性实现

  • 智能意图识别:Router Agent 通过模板化的提示词和 LLM 进行意图分析,支持多种意图类型
  • 缓存机制:为了提高性能,Router Agent 实现了智能缓存机制
    • 支持可配置的缓存开关
    • 设置缓存TTL(生存时间)
    • 缓存命中率统计
  • 异步处理:Router Agent 全面支持异步操作:
    • 使用 async/await 语法
    • 支持异步上下文管理器
    • 超时控制机制
  • 性能监控:内置完善的性能指标收集:
    • 总调用次数统计
    • 缓存命中率计算
    • 意图分布统计
    • 平均延迟监控

使用样例

  • 基本使用
# 使用工厂函数创建 Agent
router = create_router_agent()

# 进行意图分类
intent = await router.classify("今天北京冷不冷?")
print(f"识别意图: {intent}")  # 输出: WEATHER
  • 自定义配置
custom_config = RouterConfig(
    model="llama3.1:8b",
    temperature=0.3,
    default_intent="UNKNOWN",
    enable_cache=True,
    cache_ttl=600
)

router = create_router_agent(config=custom_config)
  • 意图处理
async def handle_user_message(user_message: str):
    intent = await router.classify(user_message)
    
    if intent == "CALCULATOR":
        print("→ 调用计算器处理...")
    elif intent == "WEATHER":
        print("→ 调用天气 API...")
    elif intent == "CHAT":
        print("→ 进入闲聊模式...")
    else:
        print("→ 使用默认处理...")

执行样例

"""
agent_router 使用示例
本模块提供了 RouterAgent 的完整使用示例,包括基本用法、自定义配置和意图处理等场景。
主要功能:
- 基本使用:演示如何使用工厂函数创建 RouterAgent 并进行意图分类
- 自定义配置:展示如何配置不同的模型、温度和意图映射
- 意图处理:演示如何根据识别的意图执行不同的处理逻辑
"""
import asyncio
# from app.core.pers_logging import setup_logging
# setup_logging()
from app.ai_agent.agent_router import (
    RouterAgent,
    RouterConfig,
    create_router_agent
)
async def example_basic_usage():
    """示例1:基本使用(工厂函数)
    演示如何使用工厂函数创建 RouterAgent 实例,并对不同的用户输入进行意图分类。
    该示例使用默认配置,展示了 RouterAgent 的基本工作流程。
    """
    print("=" * 50)
    print("示例1:基本使用")
    print("=" * 50)
    
    # 使用工厂函数创建 Agent(默认配置)
    router = create_router_agent()
    
    # 测试不同的用户输入
    test_messages = [
        "今天北京冷不冷?",
        "帮我算一下 123 * 456",
        "月亮为什么是圆的?",
        "1 + 1 等于多少"
    ]
    
    for msg in test_messages:
        intent = await router.classify(msg)
        print(f"用户: {msg}")
        print(f"意图: {intent}")
        print()
    
    # 关闭连接
    await router.close()
async def example_custom_config():
    """示例2:自定义配置
    演示如何创建自定义的 RouterConfig,包括指定不同的模型、调整温度参数、
    设置默认意图以及自定义意图映射。该示例展示了 RouterAgent 的灵活配置能力。
    """
    print("=" * 50)
    print("示例2:自定义配置")
    print("=" * 50)
    
    # 创建自定义配置
    custom_config = RouterConfig(
        model="llama3.1:8b",           # 使用不同模型
        temperature=0.3,            # 调整温度
        default_intent="UNKNOWN",  # 默认意图
        intent_prompt_template=(
            "请判断用户意图(只输出模式名):\n"
            "- SEARCH: 搜索信息\n"
            "- CHAT: 闲聊\n"
            "- CALC: 计算\n\n"
            "用户问题: {user_message}\n模式:"
        ),
        intent_mapping={
            "SEARCH": "WEB_SEARCH",
            "CALC": "CALCULATOR",
            "CHAT": "CHAT"
        }
    )
    
    # 使用自定义配置创建 Agent
    router = create_router_agent(config=custom_config)
    
    # 测试分类
    msg = "搜索最新的 AI 新闻"
    intent = await router.classify(msg)
    print(f"用户: {msg}")
    print(f"意图: {intent}")
    print(f"意图说明: {custom_config.intent_mapping.get(intent, '未知')}")
    
    await router.close()
async def example_intent_handler():
    """示例3:根据意图执行不同操作
    演示如何根据 RouterAgent 识别的意图来执行不同的处理逻辑。
    该示例包含一个内部函数 handle_user_message,展示了意图识别与业务逻辑的集成方式。
    """
    print("=" * 50)
    print("示例3:根据意图执行不同操作")
    print("=" * 50)
    
    # 创建 Router
    router = create_router_agent()
    
    async def handle_user_message(user_message: str):
        """根据意图处理用户消息
        Args:
            user_message: 用户输入的消息文本
        """
        intent = await router.classify(user_message)
        
        print(f"\n用户: {user_message}")
        print(f"识别意图: {intent}")
        
        # 根据意图分发处理逻辑
        if intent == "CALCULATOR":
            print("→ 调用计算器处理...")
            # calculator.calculate(user_message)
            
        elif intent == "WEATHER":
            print("→ 调用天气 API...")
            # weather_service.get_weather(user_message)
            
        elif intent == "CHAT":
            print("→ 进入闲聊模式...")
            # chatbot.reply(user_message)
            
        else:
            print("→ 使用默认处理...")
    
    # 模拟多个用户消息
    messages = [
        "明天会下雨吗?",
        "100 / 4 是多少",
        "你好啊",
        "今天的汇率是多少"
    ]
    
    for msg in messages:
        await handle_user_message(msg)
    
    await router.close()
async def main():
    """运行所有示例
    按顺序执行所有示例函数,展示 RouterAgent 的各种使用场景。
    包含异常处理机制,确保在出现错误时能够提供详细的错误信息。
    """
    print("\n" + "🤖" * 25)
    print("Agent Router 使用示例集合")
    print("🤖" * 25 + "\n")
    
    try:
        await example_basic_usage()
        await asyncio.sleep(0.5)
        
        await example_custom_config()
        await asyncio.sleep(0.5)
        
        await example_intent_handler()
        
    except Exception as e:
        print(f"\n❌ 运行出错: {e}")
        import traceback
        traceback.print_exc()
if __name__ == "__main__":
    asyncio.run(main())
  • 执行结果
🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖
Agent Router 使用示例集合
🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖🤖

==================================================
示例1:基本使用
==================================================
用户: 今天北京冷不冷?
意图: WEATHER

用户: 帮我算一下 123 * 456
意图: CALCULATOR

用户: 月亮为什么是圆的?
意图: CHAT

用户: 1 + 1 等于多少
意图: CALCULATOR

==================================================
示例2:自定义配置
==================================================
用户: 搜索最新的 AI 新闻
意图: WEB_SEARCH
意图说明: 未知
==================================================
示例3:根据意图执行不同操作
==================================================

用户: 明天会下雨吗?
识别意图: WEATHER
→ 调用天气 API...

用户: 100 / 4 是多少
识别意图: CALCULATOR
→ 调用计算器处理...

用户: 你好啊
识别意图: CHAT
→ 进入闲聊模式...

用户: 今天的汇率是多少
识别意图: WEATHER
→ 调用天气 API...

以上便是本文的全部内容,感谢您的阅读,如遇到任何问题,欢迎在评论区留言讨论。



评论