Henry
发布于 2026-01-06 / 14 阅读
0
0

Agentic AI - 第一课 - 从零实现基于 ReAct 范式的智能 Agent

背景简介

随着大语言模型(LLM)技术的飞速发展,让模型不仅能“说”更能“做”成为了 Agent 研究的核心方向。本篇博客将带你从零开始,基于 Python 3.11 和本地部署的 Ollama (Llama3.1:8b) 模型,实现一个具备实际工具调用能力的智能 Agent。
本项目涵盖了天气查询与数学计算两大核心功能,旨在演示 Function Calling 的完整闭环。我们将重点探讨 ReAct 范式 的应用,以及如何通过定义规范的 Tool Schema 来辅助 LLM 准确生成结构化 JSON 指令,从而让模型理解物理世界的数据并进行逻辑运算。

前置信息

详细信息

ReAct 流程

功能实现

依赖库安装

# requirements.txt
# HTTP 请求库
requests
httpx

# 重试机制
tenacity

项目结构

本项目的代码结构主要分为工具实现、LLM 封装、Agent 逻辑实现三个部分。

天气查询工具

我们选择了免费且无需 API Key 的 Open-Meteo 作为天气数据源。

该模块实现了一个完整的天气查询类 OpenMeteoWeather,包含以下核心功能:

  • 地理编码:将城市名称(支持中文)转换为经纬度坐标。
  • 实时天气:获取指定坐标的当前温度、湿度、风速及天气状况。
  • 数据解析:将 WMO 天气代码转换为人类可读的中文描述(如“晴朗”、“大雨”)。
  • 异常处理:封装了网络请求的异常处理机制,确保服务不可用时不影响主程序崩溃。
  • 获取天气数据代码
# app/utils/weather.py

import requests
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from enum import Enum


class WeatherCode(Enum):
    """天气代码枚举"""
    CLEAR = 0  # 晴朗
    MAINLY_CLEAR = 1  # 大部分晴朗
    PARTLY_CLOUDY = 2  # 多云
    OVERCAST = 3  # 阴天
    FOG = 45  # 有雾
    DEPOSITING_RIME_FOG = 48  # 雾凇
    DRIZZLE_LIGHT = 51  # 毛毛雨
    DRIZZLE_MODERATE = 53  # 中等毛毛雨
    DRIZZLE_DENSE = 55  # 浓毛毛雨
    RAIN_FREEZING_LIGHT = 56  # 冻雨
    RAIN_FREEZING_HEAVY = 57  # 重冻雨
    RAIN_SHOWERS_LIGHT = 80  # 小阵雨
    RAIN_SHOWERS_MODERATE = 81  # 中阵雨
    RAIN_SHOWERS_VIOLENT = 82  # 强阵雨
    RAIN_SLIGHT = 61  # 小雨
    RAIN_MODERATE = 63  # 中雨
    RAIN_HEAVY = 65  # 大雨
    SNOW_FALL_SLIGHT = 71  # 小雪
    SNOW_FALL_MODERATE = 73  # 中雪
    SNOW_FALL_HEAVY = 75  # 大雪
    THUNDERSTORM = 95  # 雷雨
    THUNDERSTORM_HAIL = 96  # 雷暴伴冰雹


@dataclass
class WeatherInfo:
    """天气信息数据类"""
    city: str
    temperature: float
    humidity: int
    wind_speed: float
    weather_code: int
    weather_desc: str
    feels_like: Optional[float] = None


class OpenMeteoWeather:
    """Open-Meteo 天气API调用类"""
    
    def __init__(self):
        self.base_url = "https://api.open-meteo.com/v1/forecast"
        self.geocoding_url = "https://geocoding-api.open-meteo.com/v1/search"
    
    def get_city_coordinates(self, city_name: str) -> Optional[tuple[float, float]]:
        """
        通过城市名获取经纬度坐标
        
        Args:
            city_name: 城市名称(支持中文和英文)
        
        Returns:
            (纬度, 经度) 元组,失败返回None
        """
        try:
            params = {
                "name": city_name,
                "count": 1,
                "language": "zh",
                "format": "json"
            }
            
            response = requests.get(self.geocoding_url, params=params, timeout=10)
            response.raise_for_status()
            
            data = response.json()
            if data.get("results") and len(data["results"]) > 0:
                location = data["results"][0]
                lat = location.get("latitude")
                lon = location.get("longitude")
                return (lat, lon)
            return None
            
        except requests.exceptions.RequestException as e:
            print(f"城市坐标查询失败: {e}")
            return None
    
    def get_current_weather(self, lat: float, lon: float, timezone: str = "auto") -> Optional[Dict[str, Any]]:
        """
        获取当前天气信息
        
        Args:
            lat: 纬度
            lon: 经度
            timezone: 时区
        
        Returns:
            天气数据字典,失败返回None
        """
        try:
            params = {
                "latitude": lat,
                "longitude": lon,
                "current": "temperature_2m,relative_humidity_2m,apparent_temperature,weather_code,wind_speed_10m",
                "timezone": timezone,
                "forecast_days": 1
            }
            
            response = requests.get(self.base_url, params=params, timeout=10)
            response.raise_for_status()
            
            return response.json()
            
        except requests.exceptions.RequestException as e:
            print(f"天气数据获取失败: {e}")
            return None
    
    def get_forecast_weather(self, lat: float, lon: float, days: int = 7, timezone: str = "auto") -> Optional[Dict[str, Any]]:
        """
        获取天气预报
        
        Args:
            lat: 纬度
            lon: 经度
            days: 预报天数
            timezone: 时区
        
        Returns:
            天气预报数据字典,失败返回None
        """
        try:
            params = {
                "latitude": lat,
                "longitude": lon,
                "daily": "temperature_2m_max,temperature_2m_min,weather_code,precipitation_sum",
                "timezone": timezone,
                "forecast_days": min(days, 16)  # Open-Meteo最多16天预报
            }
            
            response = requests.get(self.base_url, params=params, timeout=10)
            response.raise_for_status()
            
            return response.json()
            
        except requests.exceptions.RequestException as e:
            print(f"天气预报获取失败: {e}")
            return None
    
    def get_weather_by_city(self, city_name: str) -> Optional[WeatherInfo]:
        """
        通过城市名获取天气信息(完整流程)
        
        Args:
            city_name: 城市名称
        
        Returns:
            WeatherInfo 对象,失败返回None
        """
        # 1. 获取城市坐标
        coordinates = self.get_city_coordinates(city_name)
        if not coordinates:
            print(f"未找到城市: {city_name}")
            return None
        
        lat, lon = coordinates
        
        # 2. 获取天气数据
        weather_data = self.get_current_weather(lat, lon)
        if not weather_data:
            return None
        
        # 3. 解析天气数据
        current = weather_data.get("current", {})
        weather_code = current.get("weather_code", 0)
        
        return WeatherInfo(
            city=city_name,
            temperature=current.get("temperature_2m", 0),
            humidity=current.get("relative_humidity_2m", 0),
            wind_speed=current.get("wind_speed_10m", 0),
            weather_code=weather_code,
            weather_desc=self.weather_code_to_desc(weather_code),
            feels_like=current.get("apparent_temperature")
        )
    
    def weather_code_to_desc(self, code: int) -> str:
        """天气代码转换为中文描述"""
        weather_map = {
            0: "晴朗",
            1: "大部分晴朗",
            2: "多云",
            3: "阴天",
            45: "有雾",
            48: "雾凇",
            51: "毛毛雨",
            53: "中等毛毛雨",
            55: "浓毛毛雨",
            56: "冻雨",
            57: "重冻雨",
            61: "小雨",
            63: "中雨",
            65: "大雨",
            66: "雨夹雪",
            67: "大雨夹雪",
            71: "小雪",
            73: "中雪",
            75: "大雪",
            77: "雪粒",
            80: "小阵雨",
            81: "中阵雨",
            82: "强阵雨",
            85: "小阵雪",
            86: "大阵雪",
            95: "雷雨",
            96: "雷暴伴冰雹",
            99: "强雷暴伴冰雹"
        }
        return weather_map.get(code, "未知天气")
    
    def format_weather_info(self, weather_info: WeatherInfo) -> str:
        """格式化天气信息输出"""
        if not weather_info:
            return "无法获取天气信息"
        feels_like_display = f"{weather_info.feels_like}°C" if weather_info.feels_like is not None else "N/A"
        output = f"""
城市: {weather_info.city}
温度: {weather_info.temperature}°C
体感温度: {feels_like_display}°C
湿度: {weather_info.humidity}%
风速: {weather_info.wind_speed} km/h
天气: {weather_info.weather_desc}
        """.strip()
        
        return output
    
    def format_forecast(self, forecast_data: Dict[str, Any]) -> str:
        """格式化天气预报输出"""
        if not forecast_data:
            return "无法获取天气预报"
        
        daily = forecast_data.get("daily", {})
        dates = daily.get("time", [])
        max_temps = daily.get("temperature_2m_max", [])
        min_temps = daily.get("temperature_2m_min", [])
        weather_codes = daily.get("weather_code", [])
        
        output = ["📅 天气预报:\n"]
        
        for i, date in enumerate(dates):
            max_temp = max_temps[i] if i < len(max_temps) else "N/A"
            min_temp = min_temps[i] if i < len(min_temps) else "N/A"
            weather_code = weather_codes[i] if i < len(weather_codes) else 0
            weather_desc = self.weather_code_to_desc(weather_code)
            
            output.append(f"  {date}: {min_temp}°C ~ {max_temp}°C, {weather_desc}")
        
        return "\n".join(output)


# 使用示例
def main():
    # 创建天气API实例
    weather = OpenMeteoWeather()
    
    print("=" * 40)
    print("Open-Meteo 天气查询")
    print("=" * 40)
    
    # 示例1: 通过城市名查询
    print("\n🏙️ 城市查询示例:")
    cities = ["北京", "上海", "广州", "深圳", "杭州"]
    
    for city in cities:
        weather_info = weather.get_weather_by_city(city)
        if weather_info:
            print(f"\n{weather.format_weather_info(weather_info)}")
        print("-" * 30)
    
    # 示例2: 通过坐标查询
    print("\n📍 坐标查询示例:")
    # 北京坐标
    lat, lon = 39.9042, 116.4074
    weather_data = weather.get_current_weather(lat, lon)
    
    if weather_data:
        current = weather_data["current"]
        weather_code = current["weather_code"]
        print(f"北京坐标 ({lat}, {lon}):")
        print(f"温度: {current['temperature_2m']}°C")
        print(f"体感温度: {current['apparent_temperature']}°C")
        print(f"湿度: {current['relative_humidity_2m']}%")
        print(f"风速: {current['wind_speed_10m']} km/h")
        print(f"天气: {weather.weather_code_to_desc(weather_code)}")
    
    # 示例3: 天气预报
    print("\n📊 天气预报示例:")
    forecast = weather.get_forecast_weather(lat, lon, days=7)
    if forecast:
        print(weather.format_forecast(forecast))
    
    # 示例4: 交互式查询
    print("\n🔍 交互式查询 (输入q退出):")
    while True:
        city = input("\n请输入城市名称: ").strip()
        if city.lower() == 'q':
            break
        
        if not city:
            continue
            
        weather_info = weather.get_weather_by_city(city)
        if weather_info:
            print(weather.format_weather_info(weather_info))
        else:
            print(f"❌ 查询失败: 未找到城市 '{city}'")


if __name__ == "__main__":
    main()

安全数学计算工具

为了防止 LLM 生成的数学表达式包含恶意代码(如 __import__('os').system('rm -rf /')),我们并没有直接使用 Python 的 eval() 函数,而是实现了一个基于 AST(抽象语法树) 的安全解析器。

  • 白名单机制:仅允许加减乘除等基本运算符 (ast.Add, ast.Sub 等)。
  • 安全求值:递归遍历 AST 节点,计算数值表达式,一旦发现函数调用或非数字常量直接抛出异常。
  • 结构化输出:返回包含表达式和计算结果的字典,方便 Agent 解析。
  • 数学运算方法定义
# app/utils/pers_math.py

import ast
import operator

# 定义允许的运算符及其对应的操作
_ALLOWED_OPERATORS = {
    ast.Add: operator.add,
    ast.Sub: operator.sub,
    ast.Mult: operator.mul,
    ast.Div: operator.truediv,
    ast.Pow: operator.pow,
    ast.USub: operator.neg,  # 负号,如 -5
}

def _eval_node(node):
    """递归计算 AST 节点"""
    # 处理数字 (Python 3.8+ 使用 ast.Constant, 旧版本可能是 ast.Num)
    if isinstance(node, ast.Constant):
        if isinstance(node.value, (int, float)):
            return node.value
        raise TypeError(f"不支持的常量类型: {type(node.value)}")
    
    # 处理二元运算 (如 1 + 2)
    if isinstance(node, ast.BinOp):
        left = _eval_node(node.left)
        right = _eval_node(node.right)
        op_type = type(node.op)
        
        if op_type in _ALLOWED_OPERATORS:
            try:
                return _ALLOWED_OPERATORS[op_type](left, right)
            except ZeroDivisionError:
                raise ZeroDivisionError("除数不能为零")
        else:
            raise TypeError(f"不支持的二元操作符: {op_type}")
    
    # 处理一元运算 (如 -5)
    if isinstance(node, ast.UnaryOp):
        operand = _eval_node(node.operand)
        op_type = type(node.op)
        
        if op_type in _ALLOWED_OPERATORS:
            return _ALLOWED_OPERATORS[op_type](operand)
        else:
            raise TypeError(f"不支持的一元操作符: {op_type}")
    
    # 如果遇到任何非允许的节点类型(如函数调用、变量名),直接报错
    raise TypeError(f"非法的语法结构: {type(node).__name__}")

def calculator(expression: str) -> dict:
    """
    安全计算数学表达式
    """
    if not isinstance(expression, str) or not expression.strip():
        return {"error": "输入不能为空"}

    try:
        # 将表达式解析为 AST
        tree = ast.parse(expression, mode='eval')
        
        # 验证并计算
        result = _eval_node(tree.body)
        
        return {
            "expression": expression.strip(),
            "result": result
        }
        
    except ZeroDivisionError as e:
        return {"error": str(e)}
    except (SyntaxError, TypeError, ValueError, MemoryError) as e:
        # 捕获语法错误或包含非法字符/结构的错误
        return {"error": f"表达式无效或包含非法操作: {str(e)}"}
    except Exception as e:
        # 兜底异常处理
        return {"error": f"计算错误: {str(e)}"}

if __name__ == "__main__":
    print(calculator("55*12"))

Ollama 异步客户端封装

这是连接本地 LLM 的核心组件。我们使用 httpx 库构建了一个功能完备的异步客户端 OllamaClient。

  • 异步 I/O:充分利用 Python 的 asyncio 特性,支持高并发请求。
  • 自动重试:利用 tenacity 库,针对网络抖动或服务端 5xx 错误自动进行指数退避重试。
  • 流式/非流式支持:统一封装了 chat 接口,既支持一次性返回结果,也支持打字机效果的流式输出。
  • Tools 原生支持:直接传递 tools 参数给 Ollama API,无需手动拼接 Prompt 来模拟工具调用。
  • 连接池管理:复用 HTTP 连接,减少握手开销。
  • Ollama LLM 调用代码
# app/core/pers_ollama.py

import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Dict, List, Optional, Literal

import httpx
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
)


# ==========================================
# 1. 类型定义
# ==========================================
Role = Literal["system", "user", "assistant", "tool"]


@dataclass
class ToolCall:
    """Tool call 结构"""
    id: str
    type: str = "function"
    function: Dict[str, Any] = field(default_factory=dict)


@dataclass
class AgentMessage:
    """消息结构"""
    role: Role
    content: str = ""
    tool_calls: Optional[List[ToolCall]] = None
    tool_call_id: Optional[str] = None
    
    def to_dict(self) -> Dict:
        """转换为字典格式"""
        data = {"role": self.role, "content": self.content}
        if self.tool_calls:
            data["tool_calls"] = [
                {"id": tc.id, "type": tc.type, "function": tc.function}
                for tc in self.tool_calls
            ]
        if self.tool_call_id:
            data["tool_call_id"] = self.tool_call_id
        return data


@dataclass
class GenerationResponse:
    """生成响应结构"""
    role: str
    content: str
    tool_calls: Optional[List[Dict]] = None
    model: Optional[str] = None
    done: bool = False
    prompt_eval_count: int = 0
    eval_count: int = 0
    
    @classmethod
    def from_ollama(cls, data: Dict) -> "GenerationResponse":
        """从 Ollama 响应创建对象"""
        message = data.get("message", {})
        tool_calls = message.get("tool_calls")
        
        return cls(
            role=message.get("role", "assistant"),
            content=message.get("content", ""),
            tool_calls=tool_calls,
            model=data.get("model"),
            done=data.get("done", False),
            prompt_eval_count=data.get("prompt_eval_count", 0),
            eval_count=data.get("eval_count", 0),
        )


# ==========================================
# 2. 异常定义
# ==========================================
class OllamaError(Exception):
    """Ollama 基础异常"""
    pass


class OllamaConnectionError(OllamaError):
    """连接错误"""
    pass


class OllamaClientError(OllamaError):
    """客户端错误 (4xx) - 不重试"""
    pass


class OllamaServerError(OllamaError):
    """服务端错误 (5xx) - 应重试"""
    pass


# ==========================================
# 3. 重试配置
# ==========================================
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((OllamaConnectionError, OllamaServerError)),
    before_sleep=before_sleep_log(logging, logging.WARNING),
    reraise=True,
)
async def _retry_request(request_func):
    """重试装饰器包装的请求函数"""
    return await request_func()


# ==========================================
# 4. 核心客户端
# ==========================================
class OllamaClient:
    """
    Ollama 异步客户端
    
    特性:
    - 异步 I/O
    - 自动重试 (仅连接错误和 5xx)
    - 流式/非流式支持
    - Tools 支持
    - 连接池复用
    """
    
    def __init__(
        self,
        base_url: str = "http://localhost:11434",
        model: str = "llama3.1:8b",
        *,
        timeout: float = 60.0,
        num_ctx: Optional[int] = None,
        temperature: float = 0.7,
        max_connections: int = 10,
    ):
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.timeout = timeout
        self.num_ctx = num_ctx
        self.temperature = temperature
        
        # HTTP 客户端配置
        self._client = httpx.AsyncClient(
            base_url=self.base_url,
            timeout=httpx.Timeout(timeout),
            limits=httpx.Limits(
                max_keepalive_connections=max_connections // 2,
                max_connections=max_connections,
            ),
        )
    
    async def close(self) -> None:
        """关闭客户端"""
        await self._client.aclose()
    
    async def __aenter__(self):
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()
    
    # ========== 内部方法 ==========
    
    async def _request(
        self,
        method: str,
        endpoint: str,
        *,
        json_data: Optional[Dict] = None,
        stream: bool = False,
        timeout: Optional[float] = None,
    ) -> Dict | AsyncIterator[Dict]:
        """统一请求处理 (使用 Python 3.10+ 的 | 语法)"""
        
        async def _do_request():
            try:
                url = f"/api/{endpoint}"
                timeout_val = timeout if timeout is not None else (None if stream else self.timeout)
                
                response = await self._client.request(
                    method,
                    url,
                    json=json_data,
                    timeout=timeout_val,
                )
                response.raise_for_status()
                
                if stream:
                    return self._iter_lines(response)
                return response.json()
                
            except httpx.ConnectError as e:
                logging.error(f"Connection failed to {self.base_url}: {e}")
                raise OllamaConnectionError(f"Cannot connect to {self.base_url}") from e
                
            except httpx.HTTPStatusError as e:
                if e.response.status_code < 500:
                    logging.error(f"Client error ({e.response.status_code}): {e.response.text[:200]}")
                    raise OllamaClientError(f"API error {e.response.status_code}") from e
                
                logging.warning(f"Server error ({e.response.status_code}), will retry...")
                raise OllamaServerError(f"Server error {e.response.status_code}") from e
        
        return await _retry_request(_do_request)
    
    @staticmethod
    async def _iter_lines(response: httpx.Response) -> AsyncIterator[Dict]:
        """流式解析 SSE"""
        async for line in response.aiter_lines():
            line = line.strip()
            if not line or line.startswith("[DONE]"):
                continue
            try:
                yield json.loads(line)
            except json.JSONDecodeError:
                continue
    
    # ========== 聊天接口 ==========
    
    async def chat(
        self,
        messages: List[AgentMessage | Dict],
        *,
        tools: Optional[List[Dict]] = None,
        format: Optional[str] = None,
        temperature: Optional[float] = None,
        num_ctx: Optional[int] = None,
        stream: bool = False,
    ) -> GenerationResponse | AsyncIterator[str]:
        """
        聊天接口
        
        Args:
            messages: 消息列表
            tools: 工具定义
            format: 输出格式 ("json")
            temperature: 温度参数
            num_ctx: 上下文窗口大小
            stream: 是否流式返回
            
        Returns:
            非流式: GenerationResponse
            流式: AsyncIterator[str] (内容片段)
        """
        # 标准化消息
        normalized_messages = [
            msg.to_dict() if isinstance(msg, AgentMessage) else msg
            for msg in messages
        ]
        
        # 构建请求
        payload: Dict[str, Any] = {
            "model": self.model,
            "messages": normalized_messages,
            "stream": stream,
            "options": {
                "temperature": temperature if temperature is not None else self.temperature,
            },
        }
        
        if num_ctx or self.num_ctx:
            payload["options"]["num_ctx"] = num_ctx or self.num_ctx
        
        if tools:
            payload["tools"] = tools
        
        if format:
            payload["format"] = format
        
        # 发送请求
        response = await self._request("POST", "chat", json_data=payload, stream=stream)
        
        if stream:
            return self._stream_content(response)
        return GenerationResponse.from_ollama(response)
    
    async def _stream_content(self, chunks: AsyncIterator[Dict]) -> AsyncIterator[str]:
        """提取流式内容"""
        async for chunk in chunks:
            if content := chunk.get("message", {}).get("content"):
                yield content
    
    # ========== 便捷方法 ==========
    
    async def generate(
        self,
        prompt: str,
        *,
        system: Optional[str] = None,
        **kwargs,
    ) -> str:
        """便捷的单轮生成"""
        messages: List[Dict] = [{"role": "user", "content": prompt}]
        if system:
            messages.insert(0, {"role": "system", "content": system})
        
        response = await self.chat(messages, **kwargs)
        return response.content
    
    async def embed(
        self,
        text: str,
        *,
        model: Optional[str] = None,
    ) -> List[float]:
        """获取文本嵌入"""
        response = await self._request(
            "POST",
            "embed",
            json_data={"model": model or self.model, "input": text},
        )
        return response.get("embedding", [])
    
    async def list_models(self) -> List[Dict]:
        """获取模型列表"""
        response = await self._request("GET", "tags")
        return response.get("models", [])
    
    async def health(self) -> bool:
        """健康检查"""
        try:
            response = await self._request("GET", "tags", timeout=2.0)
            return True
        except OllamaError:
            return False
    
    async def pull_model(self, model: str, stream: bool = False) -> Dict | AsyncIterator[Dict]:
        """拉取模型"""
        return await self._request(
            "POST",
            "pull",
            json_data={"name": model, "stream": stream},
            stream=stream,
        )
    
    # ========== 批量支持 ==========
    
    async def batch_chat(
        self,
        message_lists: List[List[AgentMessage | Dict]],
        **kwargs,
    ) -> List[GenerationResponse]:
        """批量聊天"""
        tasks = [self.chat(messages, **kwargs) for messages in message_lists]
        return await asyncio.gather(*tasks)

async def main():
    """演示 OllamaClient 的完整功能"""
    
    # 1. 创建客户端
    async with OllamaClient(
        base_url="http://localhost:11434",
        model="llama3.1:8b",
        timeout=60.0,
        num_ctx=4096,
    ) as client:
        # ========== 健康检查 ==========
        print("=" * 50)
        print("【健康检查】")
        is_healthy = await client.health()
        print(f"服务状态: {'✓ 正常' if is_healthy else '✗ 异常'}")
        print()
        
        # ========== 模型管理 ==========
        print("=" * 50)
        print("【可用模型】")
        models = await client.list_models()
        for m in models:
            print(f"  - {m['name']}")
        print()
        
        # ========== 单轮生成 ==========
        print("=" * 50)
        print("【单轮生成】")
        print("问题: 什么是AI?")
        print("回答: ", end="")
        result = await client.generate(
            "用一句话解释AI",
            system="你是简洁的专家助手",
            temperature=0.5
        )
        print(result)
        print()
        
        # ========== 多轮对话 ==========
        print("=" * 50)
        print("【多轮对话】")
        messages = [
            AgentMessage(role="system", content="你是Python异步编程专家"),
            AgentMessage(role="user", content="什么是asyncio?"),
        ]
        response = await client.chat(messages)
        print(f"问题: {messages[1].content}")
        print(f"回答: {response.content}")
        print()
        
        # ========== 流式输出 ==========
        print("=" * 50)
        print("【流式输出】")
        print("输出: ", end="", flush=True)
        # 修改点:先 await 获取迭代器对象
        stream = await client.chat(
            [AgentMessage(role="user", content="写一个简短的hello world示例")],
            stream=True
        )
        async for chunk in stream:
            print(chunk, end="", flush=True)
        print("\n")
        
        # ========== 嵌入向量 ==========
        print("=" * 50)
        print("【文本嵌入】")
        embedding = await client.embed("Hello, Ollama!")
        print(f"向量维度: {len(embedding)}")
        print(f"前5维: {embedding[:5]}")
        print()
        
        # ========== 批量请求 ==========
        print("=" * 50)
        print("【批量请求】")
        batch_messages = [
            [AgentMessage(role="user", content="什么是Python?")],
            [AgentMessage(role="user", content="什么是JavaScript?")],
            [AgentMessage(role="user", content="什么是Rust?")],
        ]
        responses = await client.batch_chat(batch_messages)
        for i, resp in enumerate(responses, 1):
            print(f"{i}. {resp.content[:80]}{'...' if len(resp.content) > 80 else ''}")
        print()
        
        # ========== JSON 格式输出 ==========
        print("=" * 50)
        print("【JSON格式输出】")
        json_response = await client.chat(
            [AgentMessage(role="user", content="返回一个包含name和age的JSON对象")],
            format="json"
        )
        print(json_response.content)
        print()
        
    print("=" * 50)
    print("演示完成")


if __name__ == "__main__":
    asyncio.run(main())

Agent 工具定义

为了让 LLM 知道有哪些工具可用,我们需要定义工具的 Schema。这里我们遵循 OpenAI 的 Function Calling 格式,定义了 TOOLS 列表:

  • tool_get_weather:描述为“查询指定城市的天气情况”,包含必填参数 city。
  • tool_calculator:描述为“执行数学计算”,包含必填参数 expression。

同时,我们维护了一个 TOOL_REGISTRY 字典,将工具名称字符串映射到实际的 Python 函数,实现了代码的解耦。

  • 工具定义代码
# app/ai_agent/agent_tools.py

import logging

from app.utils.weather import OpenMeteoWeather
from app.utils.pers_math import calculator


TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "tool_get_weather",
            "description": "查询指定城市的天气情况",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "城市名称,如:北京、上海"
                    }
                },
                "required": ["city"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "tool_calculator",
            "description": "执行数学计算,支持加减乘除和括号",
            "parameters": {
                "type": "object",
                "properties": {
                    "expression": {
                        "type": "string",
                        "description": "要计算的数学表达式,如:123 * 456 或 (10 + 20) * 3"
                    }
                },
                "required": ["expression"]
            }
        }
    }
]

def tool_get_weather(city: str) -> str:
    try:
        weather_client = OpenMeteoWeather()
        weather_info = weather_client.get_weather_by_city(city)
        
        if weather_info:
            return weather_client.format_weather_info(weather_info)
        else:
            return "无法获取天气信息"
    except Exception as e:
        # 捕获未预料到的错误,防止程序崩溃
        logging.error(f"获取天气时发生错误: {e}")
        return f"获取天气时发生错误: {str(e)}"


def tool_calculator(expression: str) -> str:
    try:
        ret_dic = calculator(expression)
        if ret_dic:
            return ret_dic['result']
    except Exception as e:
        # 捕获未预料到的错误,防止程序崩溃
        logging.error(f"计算出错: {e}")
        return f"计算时出错: {str(e)}"

ReAct 智能体主逻辑

这是整个系统的指挥中心,AgenticBot 类实现了完整的 ReAct 循环逻辑。

  • System Prompt 策略:我们在系统提示词中明确规定了模型的行为准则,特别强调了“准确提取参数”和“自我纠错”。例如,如果模型错误地查询了北京而不是用户问的上海,Prompt 会强迫它再次调用工具修正错误。
  • ReAct 循环:
    • 将用户问题与历史对话发送给 LLM。
    • 检查 LLM 返回的响应中是否包含 tool_calls。
    • 如果无工具调用,直接返回最终文本回答。
    • 如果有工具调用,遍历 tool_calls,通过 TOOL_REGISTRY 执行对应函数。
    • 将工具执行结果以 role: "tool" 的形式追加回消息列表。
    • 进入下一轮迭代,直到达到最大迭代次数或获得最终答案。
  • 低温度设置:在初始化客户端时,我们将 temperature 设为 0,这是为了让模型在提取工具参数时更加严谨,减少幻觉。
  • Agentic ReAct 样例代码
# sample/sample.py

import asyncio
from typing import Dict, Any, Callable
from app.ai_agent.agent_tools import tool_calculator, tool_get_weather
from app.ai_agent.agent_tools import TOOLS
from app.core.pers_ollama import OllamaClient


# ========== 工具注册表(替代硬编码) ==========
TOOL_REGISTRY: Dict[str, Callable] = {
    "tool_get_weather": tool_get_weather,
    "tool_calculator": tool_calculator,
}


class AgenticBot:
    """基于 ReAct 循环的智能 Agent"""
    
    # 定义系统提示词,约束模型行为
    SYSTEM_PROMPT = (
        "你是一个智能助手,可以使用工具获取信息。\n\n"
        "严格遵循以下规则:\n"
        "1. **准确提取参数**:调用工具时,必须严格从用户输入中提取城市或数值,严禁凭空捏造。\n"
        "2. **自我纠错**:如果你发现上一步调用的工具参数是错误的(例如用户问上海你却查了北京),"
        "你必须再次调用工具并传入正确的参数来获取正确信息。\n"
        "3. **不要只解释**:发现错误时,不要用文字描述你要怎么做,直接调用工具去执行。\n"
        "4. **任务完成**:只有当你获得了准确的工具结果后,才向用户进行最终回答。"
    )


    def __init__(
        self,
        base_url: str = "http://localhost:11434",
        model: str = "llama3.1:8b",
        max_iterations: int = 5,
    ):
        # 在客户端初始化时默认使用较低的温度,适合工具调用
        self.client = OllamaClient(
            base_url=base_url, 
            model=model,
            temperature=0  # 设为 0 减少随机性,提高参数提取准确率
        )
        self.model = model
        self.max_iterations = max_iterations
    
    async def close(self):
        """关闭客户端"""
        await self.client.close()
    
    async def __aenter__(self):
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()
    
    async def call_tool(self, tool_name: str, arguments: dict, tool_call_id: str) -> Dict[str, Any]:
        """
        执行指定的工具函数
        
        Args:
            tool_name: 工具名称
            arguments: 工具参数
            tool_call_id: 工具调用 ID(用于构建消息)
        
        Returns:
            工具执行结果消息
        """
        try:
            if tool_name not in TOOL_REGISTRY:
                raise ValueError(f"未知工具: {tool_name}")
            
            print(f"🔧 调用工具: {tool_name}")
            print(f"📝 参数: {arguments}")
            
            # 执行工具函数
            result = TOOL_REGISTRY[tool_name](**arguments)
            print(f"✅ 工具结果: {result}\n")
            
            return {
                "role": "tool",
                "content": str(result),
                "name": tool_name,
                "tool_call_id": tool_call_id,  # 必需字段
            }
        except Exception as e:
            print(f"❌ 工具执行失败: {e}\n")
            return {
                "role": "tool",
                "content": f"工具执行错误: {str(e)}",
                "name": tool_name,
                "tool_call_id": tool_call_id,
            }
        
    async def chat_with_bot(self, user_message: str) -> str:
        """
        实现完整的 ReAct 循环
        """
        # 构建消息列表,第一条就是 System Prompt
        messages = [
            {"role": "system", "content": self.SYSTEM_PROMPT},
            {"role": "user", "content": user_message}
        ]
        
        for iteration in range(self.max_iterations):
            if iteration > 0:
                print(f"🔄 第 {iteration} 轮思考...")
            
            # 调用 LLM (temperature 已在初始化时设为 0)
            response = await self.client.chat(
                messages=messages,
                tools=TOOLS,
            )
            
            # ... (后续逻辑保持不变) ...
            
            if not response.tool_calls:
                return response.content
            
            assistant_message = {
                "role": "assistant",
                "content": response.content,
                "tool_calls": response.tool_calls,
            }
            messages.append(assistant_message)
            
            for tool_call in response.tool_calls:
                tool_name = tool_call["function"]["name"]
                tool_args = tool_call["function"]["arguments"]
                tool_call_id = tool_call["id"]
                
                tool_result_message = await self.call_tool(
                    tool_name, tool_args, tool_call_id
                )
                messages.append(tool_result_message)
        
        final_response = await self.client.chat(messages=messages)
        return final_response.content



# ========== 主程序 ==========

async def main():
    """主程序入口"""
    test_questions = [
        "上海现在冷不冷?",
        "今天北京天气怎么样?",
        "123 * 456 是多少?",
        "帮我算一下 (100 - 50) * 2 + 10",
    ]
    
    print("🤖 天气与数学机器人已启动!")
    print("=" * 50)
    
    async with AgenticBot(
        base_url="http://localhost:11434",
        model="llama3.1:8b",
        max_iterations=5,
    ) as bot:
        for question in test_questions:
            print(f"\n👤 用户: {question}")
            print(f"🤖 机器人: ", end="", flush=True)
            
            try:
                answer = await bot.chat_with_bot(question)
                print(answer)
            except Exception as e:
                print(f"❌ 发生错误: {e}")
            
            print("-" * 50)
    
    print("✅ 程序结束")


if __name__ == "__main__":
    asyncio.run(main())

运行示例

准备好所有代码文件后,在终端运行主程序:

python sample/sample.py
🤖 天气与数学机器人已启动!
==================================================

👤 用户: 上海现在冷不冷?
🤖 机器人: 🔧 调用工具: tool_get_weather
📝 参数: {'city': '北京'}
城市坐标查询失败: HTTPSConnectionPool(host='geocoding-api.open-meteo.com', port=443): Read timed out. (read timeout=10)
未找到城市: 北京
✅ 工具结果: 无法获取天气信息

🔄 第 1 轮思考...
🔧 调用工具: tool_get_weather
📝 参数: {'city': '上海'}
✅ 工具结果: 城市: 上海
温度: 2.4°C
体感温度: -1.5°C
湿度: 67%
风速: 9.0 km/h
天气: 大部分晴朗

🔄 第 2 轮思考...
上海现在冷得很呢,温度只有2.4°C。
--------------------------------------------------

👤 用户: 今天北京天气怎么样?
🤖 机器人: 🔧 调用工具: tool_get_weather
📝 参数: {'city': '北京'}
城市坐标查询失败: HTTPSConnectionPool(host='geocoding-api.open-meteo.com', port=443): Read timed out. (read timeout=10)
未找到城市: 北京
✅ 工具结果: 无法获取天气信息

🔄 第 1 轮思考...
🔧 调用工具: tool_get_weather
📝 参数: {'city': '北京'}
✅ 工具结果: 城市: 北京
温度: 7.6°C
体感温度: 4.1°C
湿度: 54%
风速: 7.6 km/h
天气: 阴天

🔄 第 2 轮思考...
今天北京的天气是阴天,温度为7.6°C,体感温度为4.1°C,湿度为54%,风速为7.6 km/h。
--------------------------------------------------

👤 用户: 123 * 456 是多少?
🤖 机器人: 🔧 调用工具: tool_calculator
📝 参数: {'expression': '123 * 456'}
✅ 工具结果: 56088

🔄 第 1 轮思考...
答案是:123 * 456 = 56088
--------------------------------------------------

👤 用户: 帮我算一下 (100 - 50) * 2 + 10
🤖 机器人: 🔧 调用工具: tool_calculator
📝 参数: {'expression': '(100 - 50) * 2 + 10'}
✅ 工具结果: 110

🔄 第 1 轮思考...
结果为:110
--------------------------------------------------
✅ 程序结束

从输出结果可以看到,Agent 不仅能够成功调用工具获取天气和计算结果,甚至在网络出现超时导致第一次查询失败时(如查询北京时),还能依据我们的 System Prompt 自主进行“第 N 轮思考”,重试工具调用并最终成功获取数据,完美体现了 ReAct 范式的智能性。


以上便是本文的全部内容,感谢您的阅读,如遇到任何问题,欢迎在评论区留言讨论。



评论