背景简介
随着大语言模型(LLM)技术的飞速发展,让模型不仅能“说”更能“做”成为了 Agent 研究的核心方向。本篇博客将带你从零开始,基于 Python 3.11 和本地部署的 Ollama (Llama3.1:8b) 模型,实现一个具备实际工具调用能力的智能 Agent。
本项目涵盖了天气查询与数学计算两大核心功能,旨在演示 Function Calling 的完整闭环。我们将重点探讨 ReAct 范式 的应用,以及如何通过定义规范的 Tool Schema 来辅助 LLM 准确生成结构化 JSON 指令,从而让模型理解物理世界的数据并进行逻辑运算。
前置信息
- Python 3.11.13 【Conda - 创建 Python 环境】
- Ollama 【Ollama - 部署 DeepSeek-R1:7B】
详细信息
ReAct 流程
功能实现
依赖库安装
# requirements.txt
# HTTP 请求库
requests
httpx
# 重试机制
tenacity
项目结构
本项目的代码结构主要分为工具实现、LLM 封装、Agent 逻辑实现三个部分。
天气查询工具
我们选择了免费且无需 API Key 的 Open-Meteo 作为天气数据源。
该模块实现了一个完整的天气查询类 OpenMeteoWeather,包含以下核心功能:
- 地理编码:将城市名称(支持中文)转换为经纬度坐标。
- 实时天气:获取指定坐标的当前温度、湿度、风速及天气状况。
- 数据解析:将 WMO 天气代码转换为人类可读的中文描述(如“晴朗”、“大雨”)。
- 异常处理:封装了网络请求的异常处理机制,确保服务不可用时不影响主程序崩溃。
- 获取天气数据代码
# app/utils/weather.py
import requests
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from enum import Enum
class WeatherCode(Enum):
"""天气代码枚举"""
CLEAR = 0 # 晴朗
MAINLY_CLEAR = 1 # 大部分晴朗
PARTLY_CLOUDY = 2 # 多云
OVERCAST = 3 # 阴天
FOG = 45 # 有雾
DEPOSITING_RIME_FOG = 48 # 雾凇
DRIZZLE_LIGHT = 51 # 毛毛雨
DRIZZLE_MODERATE = 53 # 中等毛毛雨
DRIZZLE_DENSE = 55 # 浓毛毛雨
RAIN_FREEZING_LIGHT = 56 # 冻雨
RAIN_FREEZING_HEAVY = 57 # 重冻雨
RAIN_SHOWERS_LIGHT = 80 # 小阵雨
RAIN_SHOWERS_MODERATE = 81 # 中阵雨
RAIN_SHOWERS_VIOLENT = 82 # 强阵雨
RAIN_SLIGHT = 61 # 小雨
RAIN_MODERATE = 63 # 中雨
RAIN_HEAVY = 65 # 大雨
SNOW_FALL_SLIGHT = 71 # 小雪
SNOW_FALL_MODERATE = 73 # 中雪
SNOW_FALL_HEAVY = 75 # 大雪
THUNDERSTORM = 95 # 雷雨
THUNDERSTORM_HAIL = 96 # 雷暴伴冰雹
@dataclass
class WeatherInfo:
"""天气信息数据类"""
city: str
temperature: float
humidity: int
wind_speed: float
weather_code: int
weather_desc: str
feels_like: Optional[float] = None
class OpenMeteoWeather:
"""Open-Meteo 天气API调用类"""
def __init__(self):
self.base_url = "https://api.open-meteo.com/v1/forecast"
self.geocoding_url = "https://geocoding-api.open-meteo.com/v1/search"
def get_city_coordinates(self, city_name: str) -> Optional[tuple[float, float]]:
"""
通过城市名获取经纬度坐标
Args:
city_name: 城市名称(支持中文和英文)
Returns:
(纬度, 经度) 元组,失败返回None
"""
try:
params = {
"name": city_name,
"count": 1,
"language": "zh",
"format": "json"
}
response = requests.get(self.geocoding_url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if data.get("results") and len(data["results"]) > 0:
location = data["results"][0]
lat = location.get("latitude")
lon = location.get("longitude")
return (lat, lon)
return None
except requests.exceptions.RequestException as e:
print(f"城市坐标查询失败: {e}")
return None
def get_current_weather(self, lat: float, lon: float, timezone: str = "auto") -> Optional[Dict[str, Any]]:
"""
获取当前天气信息
Args:
lat: 纬度
lon: 经度
timezone: 时区
Returns:
天气数据字典,失败返回None
"""
try:
params = {
"latitude": lat,
"longitude": lon,
"current": "temperature_2m,relative_humidity_2m,apparent_temperature,weather_code,wind_speed_10m",
"timezone": timezone,
"forecast_days": 1
}
response = requests.get(self.base_url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"天气数据获取失败: {e}")
return None
def get_forecast_weather(self, lat: float, lon: float, days: int = 7, timezone: str = "auto") -> Optional[Dict[str, Any]]:
"""
获取天气预报
Args:
lat: 纬度
lon: 经度
days: 预报天数
timezone: 时区
Returns:
天气预报数据字典,失败返回None
"""
try:
params = {
"latitude": lat,
"longitude": lon,
"daily": "temperature_2m_max,temperature_2m_min,weather_code,precipitation_sum",
"timezone": timezone,
"forecast_days": min(days, 16) # Open-Meteo最多16天预报
}
response = requests.get(self.base_url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"天气预报获取失败: {e}")
return None
def get_weather_by_city(self, city_name: str) -> Optional[WeatherInfo]:
"""
通过城市名获取天气信息(完整流程)
Args:
city_name: 城市名称
Returns:
WeatherInfo 对象,失败返回None
"""
# 1. 获取城市坐标
coordinates = self.get_city_coordinates(city_name)
if not coordinates:
print(f"未找到城市: {city_name}")
return None
lat, lon = coordinates
# 2. 获取天气数据
weather_data = self.get_current_weather(lat, lon)
if not weather_data:
return None
# 3. 解析天气数据
current = weather_data.get("current", {})
weather_code = current.get("weather_code", 0)
return WeatherInfo(
city=city_name,
temperature=current.get("temperature_2m", 0),
humidity=current.get("relative_humidity_2m", 0),
wind_speed=current.get("wind_speed_10m", 0),
weather_code=weather_code,
weather_desc=self.weather_code_to_desc(weather_code),
feels_like=current.get("apparent_temperature")
)
def weather_code_to_desc(self, code: int) -> str:
"""天气代码转换为中文描述"""
weather_map = {
0: "晴朗",
1: "大部分晴朗",
2: "多云",
3: "阴天",
45: "有雾",
48: "雾凇",
51: "毛毛雨",
53: "中等毛毛雨",
55: "浓毛毛雨",
56: "冻雨",
57: "重冻雨",
61: "小雨",
63: "中雨",
65: "大雨",
66: "雨夹雪",
67: "大雨夹雪",
71: "小雪",
73: "中雪",
75: "大雪",
77: "雪粒",
80: "小阵雨",
81: "中阵雨",
82: "强阵雨",
85: "小阵雪",
86: "大阵雪",
95: "雷雨",
96: "雷暴伴冰雹",
99: "强雷暴伴冰雹"
}
return weather_map.get(code, "未知天气")
def format_weather_info(self, weather_info: WeatherInfo) -> str:
"""格式化天气信息输出"""
if not weather_info:
return "无法获取天气信息"
feels_like_display = f"{weather_info.feels_like}°C" if weather_info.feels_like is not None else "N/A"
output = f"""
城市: {weather_info.city}
温度: {weather_info.temperature}°C
体感温度: {feels_like_display}°C
湿度: {weather_info.humidity}%
风速: {weather_info.wind_speed} km/h
天气: {weather_info.weather_desc}
""".strip()
return output
def format_forecast(self, forecast_data: Dict[str, Any]) -> str:
"""格式化天气预报输出"""
if not forecast_data:
return "无法获取天气预报"
daily = forecast_data.get("daily", {})
dates = daily.get("time", [])
max_temps = daily.get("temperature_2m_max", [])
min_temps = daily.get("temperature_2m_min", [])
weather_codes = daily.get("weather_code", [])
output = ["📅 天气预报:\n"]
for i, date in enumerate(dates):
max_temp = max_temps[i] if i < len(max_temps) else "N/A"
min_temp = min_temps[i] if i < len(min_temps) else "N/A"
weather_code = weather_codes[i] if i < len(weather_codes) else 0
weather_desc = self.weather_code_to_desc(weather_code)
output.append(f" {date}: {min_temp}°C ~ {max_temp}°C, {weather_desc}")
return "\n".join(output)
# 使用示例
def main():
# 创建天气API实例
weather = OpenMeteoWeather()
print("=" * 40)
print("Open-Meteo 天气查询")
print("=" * 40)
# 示例1: 通过城市名查询
print("\n🏙️ 城市查询示例:")
cities = ["北京", "上海", "广州", "深圳", "杭州"]
for city in cities:
weather_info = weather.get_weather_by_city(city)
if weather_info:
print(f"\n{weather.format_weather_info(weather_info)}")
print("-" * 30)
# 示例2: 通过坐标查询
print("\n📍 坐标查询示例:")
# 北京坐标
lat, lon = 39.9042, 116.4074
weather_data = weather.get_current_weather(lat, lon)
if weather_data:
current = weather_data["current"]
weather_code = current["weather_code"]
print(f"北京坐标 ({lat}, {lon}):")
print(f"温度: {current['temperature_2m']}°C")
print(f"体感温度: {current['apparent_temperature']}°C")
print(f"湿度: {current['relative_humidity_2m']}%")
print(f"风速: {current['wind_speed_10m']} km/h")
print(f"天气: {weather.weather_code_to_desc(weather_code)}")
# 示例3: 天气预报
print("\n📊 天气预报示例:")
forecast = weather.get_forecast_weather(lat, lon, days=7)
if forecast:
print(weather.format_forecast(forecast))
# 示例4: 交互式查询
print("\n🔍 交互式查询 (输入q退出):")
while True:
city = input("\n请输入城市名称: ").strip()
if city.lower() == 'q':
break
if not city:
continue
weather_info = weather.get_weather_by_city(city)
if weather_info:
print(weather.format_weather_info(weather_info))
else:
print(f"❌ 查询失败: 未找到城市 '{city}'")
if __name__ == "__main__":
main()
安全数学计算工具
为了防止 LLM 生成的数学表达式包含恶意代码(如 __import__('os').system('rm -rf /')),我们并没有直接使用 Python 的 eval() 函数,而是实现了一个基于 AST(抽象语法树) 的安全解析器。
- 白名单机制:仅允许加减乘除等基本运算符 (ast.Add, ast.Sub 等)。
- 安全求值:递归遍历 AST 节点,计算数值表达式,一旦发现函数调用或非数字常量直接抛出异常。
- 结构化输出:返回包含表达式和计算结果的字典,方便 Agent 解析。
- 数学运算方法定义
# app/utils/pers_math.py
import ast
import operator
# 定义允许的运算符及其对应的操作
_ALLOWED_OPERATORS = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.USub: operator.neg, # 负号,如 -5
}
def _eval_node(node):
"""递归计算 AST 节点"""
# 处理数字 (Python 3.8+ 使用 ast.Constant, 旧版本可能是 ast.Num)
if isinstance(node, ast.Constant):
if isinstance(node.value, (int, float)):
return node.value
raise TypeError(f"不支持的常量类型: {type(node.value)}")
# 处理二元运算 (如 1 + 2)
if isinstance(node, ast.BinOp):
left = _eval_node(node.left)
right = _eval_node(node.right)
op_type = type(node.op)
if op_type in _ALLOWED_OPERATORS:
try:
return _ALLOWED_OPERATORS[op_type](left, right)
except ZeroDivisionError:
raise ZeroDivisionError("除数不能为零")
else:
raise TypeError(f"不支持的二元操作符: {op_type}")
# 处理一元运算 (如 -5)
if isinstance(node, ast.UnaryOp):
operand = _eval_node(node.operand)
op_type = type(node.op)
if op_type in _ALLOWED_OPERATORS:
return _ALLOWED_OPERATORS[op_type](operand)
else:
raise TypeError(f"不支持的一元操作符: {op_type}")
# 如果遇到任何非允许的节点类型(如函数调用、变量名),直接报错
raise TypeError(f"非法的语法结构: {type(node).__name__}")
def calculator(expression: str) -> dict:
"""
安全计算数学表达式
"""
if not isinstance(expression, str) or not expression.strip():
return {"error": "输入不能为空"}
try:
# 将表达式解析为 AST
tree = ast.parse(expression, mode='eval')
# 验证并计算
result = _eval_node(tree.body)
return {
"expression": expression.strip(),
"result": result
}
except ZeroDivisionError as e:
return {"error": str(e)}
except (SyntaxError, TypeError, ValueError, MemoryError) as e:
# 捕获语法错误或包含非法字符/结构的错误
return {"error": f"表达式无效或包含非法操作: {str(e)}"}
except Exception as e:
# 兜底异常处理
return {"error": f"计算错误: {str(e)}"}
if __name__ == "__main__":
print(calculator("55*12"))
Ollama 异步客户端封装
这是连接本地 LLM 的核心组件。我们使用 httpx 库构建了一个功能完备的异步客户端 OllamaClient。
- 异步 I/O:充分利用 Python 的 asyncio 特性,支持高并发请求。
- 自动重试:利用 tenacity 库,针对网络抖动或服务端 5xx 错误自动进行指数退避重试。
- 流式/非流式支持:统一封装了 chat 接口,既支持一次性返回结果,也支持打字机效果的流式输出。
- Tools 原生支持:直接传递 tools 参数给 Ollama API,无需手动拼接 Prompt 来模拟工具调用。
- 连接池管理:复用 HTTP 连接,减少握手开销。
- Ollama LLM 调用代码
# app/core/pers_ollama.py
import asyncio
import json
import logging
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Dict, List, Optional, Literal
import httpx
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
)
# ==========================================
# 1. 类型定义
# ==========================================
Role = Literal["system", "user", "assistant", "tool"]
@dataclass
class ToolCall:
"""Tool call 结构"""
id: str
type: str = "function"
function: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AgentMessage:
"""消息结构"""
role: Role
content: str = ""
tool_calls: Optional[List[ToolCall]] = None
tool_call_id: Optional[str] = None
def to_dict(self) -> Dict:
"""转换为字典格式"""
data = {"role": self.role, "content": self.content}
if self.tool_calls:
data["tool_calls"] = [
{"id": tc.id, "type": tc.type, "function": tc.function}
for tc in self.tool_calls
]
if self.tool_call_id:
data["tool_call_id"] = self.tool_call_id
return data
@dataclass
class GenerationResponse:
"""生成响应结构"""
role: str
content: str
tool_calls: Optional[List[Dict]] = None
model: Optional[str] = None
done: bool = False
prompt_eval_count: int = 0
eval_count: int = 0
@classmethod
def from_ollama(cls, data: Dict) -> "GenerationResponse":
"""从 Ollama 响应创建对象"""
message = data.get("message", {})
tool_calls = message.get("tool_calls")
return cls(
role=message.get("role", "assistant"),
content=message.get("content", ""),
tool_calls=tool_calls,
model=data.get("model"),
done=data.get("done", False),
prompt_eval_count=data.get("prompt_eval_count", 0),
eval_count=data.get("eval_count", 0),
)
# ==========================================
# 2. 异常定义
# ==========================================
class OllamaError(Exception):
"""Ollama 基础异常"""
pass
class OllamaConnectionError(OllamaError):
"""连接错误"""
pass
class OllamaClientError(OllamaError):
"""客户端错误 (4xx) - 不重试"""
pass
class OllamaServerError(OllamaError):
"""服务端错误 (5xx) - 应重试"""
pass
# ==========================================
# 3. 重试配置
# ==========================================
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((OllamaConnectionError, OllamaServerError)),
before_sleep=before_sleep_log(logging, logging.WARNING),
reraise=True,
)
async def _retry_request(request_func):
"""重试装饰器包装的请求函数"""
return await request_func()
# ==========================================
# 4. 核心客户端
# ==========================================
class OllamaClient:
"""
Ollama 异步客户端
特性:
- 异步 I/O
- 自动重试 (仅连接错误和 5xx)
- 流式/非流式支持
- Tools 支持
- 连接池复用
"""
def __init__(
self,
base_url: str = "http://localhost:11434",
model: str = "llama3.1:8b",
*,
timeout: float = 60.0,
num_ctx: Optional[int] = None,
temperature: float = 0.7,
max_connections: int = 10,
):
self.base_url = base_url.rstrip("/")
self.model = model
self.timeout = timeout
self.num_ctx = num_ctx
self.temperature = temperature
# HTTP 客户端配置
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=httpx.Timeout(timeout),
limits=httpx.Limits(
max_keepalive_connections=max_connections // 2,
max_connections=max_connections,
),
)
async def close(self) -> None:
"""关闭客户端"""
await self._client.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
# ========== 内部方法 ==========
async def _request(
self,
method: str,
endpoint: str,
*,
json_data: Optional[Dict] = None,
stream: bool = False,
timeout: Optional[float] = None,
) -> Dict | AsyncIterator[Dict]:
"""统一请求处理 (使用 Python 3.10+ 的 | 语法)"""
async def _do_request():
try:
url = f"/api/{endpoint}"
timeout_val = timeout if timeout is not None else (None if stream else self.timeout)
response = await self._client.request(
method,
url,
json=json_data,
timeout=timeout_val,
)
response.raise_for_status()
if stream:
return self._iter_lines(response)
return response.json()
except httpx.ConnectError as e:
logging.error(f"Connection failed to {self.base_url}: {e}")
raise OllamaConnectionError(f"Cannot connect to {self.base_url}") from e
except httpx.HTTPStatusError as e:
if e.response.status_code < 500:
logging.error(f"Client error ({e.response.status_code}): {e.response.text[:200]}")
raise OllamaClientError(f"API error {e.response.status_code}") from e
logging.warning(f"Server error ({e.response.status_code}), will retry...")
raise OllamaServerError(f"Server error {e.response.status_code}") from e
return await _retry_request(_do_request)
@staticmethod
async def _iter_lines(response: httpx.Response) -> AsyncIterator[Dict]:
"""流式解析 SSE"""
async for line in response.aiter_lines():
line = line.strip()
if not line or line.startswith("[DONE]"):
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
# ========== 聊天接口 ==========
async def chat(
self,
messages: List[AgentMessage | Dict],
*,
tools: Optional[List[Dict]] = None,
format: Optional[str] = None,
temperature: Optional[float] = None,
num_ctx: Optional[int] = None,
stream: bool = False,
) -> GenerationResponse | AsyncIterator[str]:
"""
聊天接口
Args:
messages: 消息列表
tools: 工具定义
format: 输出格式 ("json")
temperature: 温度参数
num_ctx: 上下文窗口大小
stream: 是否流式返回
Returns:
非流式: GenerationResponse
流式: AsyncIterator[str] (内容片段)
"""
# 标准化消息
normalized_messages = [
msg.to_dict() if isinstance(msg, AgentMessage) else msg
for msg in messages
]
# 构建请求
payload: Dict[str, Any] = {
"model": self.model,
"messages": normalized_messages,
"stream": stream,
"options": {
"temperature": temperature if temperature is not None else self.temperature,
},
}
if num_ctx or self.num_ctx:
payload["options"]["num_ctx"] = num_ctx or self.num_ctx
if tools:
payload["tools"] = tools
if format:
payload["format"] = format
# 发送请求
response = await self._request("POST", "chat", json_data=payload, stream=stream)
if stream:
return self._stream_content(response)
return GenerationResponse.from_ollama(response)
async def _stream_content(self, chunks: AsyncIterator[Dict]) -> AsyncIterator[str]:
"""提取流式内容"""
async for chunk in chunks:
if content := chunk.get("message", {}).get("content"):
yield content
# ========== 便捷方法 ==========
async def generate(
self,
prompt: str,
*,
system: Optional[str] = None,
**kwargs,
) -> str:
"""便捷的单轮生成"""
messages: List[Dict] = [{"role": "user", "content": prompt}]
if system:
messages.insert(0, {"role": "system", "content": system})
response = await self.chat(messages, **kwargs)
return response.content
async def embed(
self,
text: str,
*,
model: Optional[str] = None,
) -> List[float]:
"""获取文本嵌入"""
response = await self._request(
"POST",
"embed",
json_data={"model": model or self.model, "input": text},
)
return response.get("embedding", [])
async def list_models(self) -> List[Dict]:
"""获取模型列表"""
response = await self._request("GET", "tags")
return response.get("models", [])
async def health(self) -> bool:
"""健康检查"""
try:
response = await self._request("GET", "tags", timeout=2.0)
return True
except OllamaError:
return False
async def pull_model(self, model: str, stream: bool = False) -> Dict | AsyncIterator[Dict]:
"""拉取模型"""
return await self._request(
"POST",
"pull",
json_data={"name": model, "stream": stream},
stream=stream,
)
# ========== 批量支持 ==========
async def batch_chat(
self,
message_lists: List[List[AgentMessage | Dict]],
**kwargs,
) -> List[GenerationResponse]:
"""批量聊天"""
tasks = [self.chat(messages, **kwargs) for messages in message_lists]
return await asyncio.gather(*tasks)
async def main():
"""演示 OllamaClient 的完整功能"""
# 1. 创建客户端
async with OllamaClient(
base_url="http://localhost:11434",
model="llama3.1:8b",
timeout=60.0,
num_ctx=4096,
) as client:
# ========== 健康检查 ==========
print("=" * 50)
print("【健康检查】")
is_healthy = await client.health()
print(f"服务状态: {'✓ 正常' if is_healthy else '✗ 异常'}")
print()
# ========== 模型管理 ==========
print("=" * 50)
print("【可用模型】")
models = await client.list_models()
for m in models:
print(f" - {m['name']}")
print()
# ========== 单轮生成 ==========
print("=" * 50)
print("【单轮生成】")
print("问题: 什么是AI?")
print("回答: ", end="")
result = await client.generate(
"用一句话解释AI",
system="你是简洁的专家助手",
temperature=0.5
)
print(result)
print()
# ========== 多轮对话 ==========
print("=" * 50)
print("【多轮对话】")
messages = [
AgentMessage(role="system", content="你是Python异步编程专家"),
AgentMessage(role="user", content="什么是asyncio?"),
]
response = await client.chat(messages)
print(f"问题: {messages[1].content}")
print(f"回答: {response.content}")
print()
# ========== 流式输出 ==========
print("=" * 50)
print("【流式输出】")
print("输出: ", end="", flush=True)
# 修改点:先 await 获取迭代器对象
stream = await client.chat(
[AgentMessage(role="user", content="写一个简短的hello world示例")],
stream=True
)
async for chunk in stream:
print(chunk, end="", flush=True)
print("\n")
# ========== 嵌入向量 ==========
print("=" * 50)
print("【文本嵌入】")
embedding = await client.embed("Hello, Ollama!")
print(f"向量维度: {len(embedding)}")
print(f"前5维: {embedding[:5]}")
print()
# ========== 批量请求 ==========
print("=" * 50)
print("【批量请求】")
batch_messages = [
[AgentMessage(role="user", content="什么是Python?")],
[AgentMessage(role="user", content="什么是JavaScript?")],
[AgentMessage(role="user", content="什么是Rust?")],
]
responses = await client.batch_chat(batch_messages)
for i, resp in enumerate(responses, 1):
print(f"{i}. {resp.content[:80]}{'...' if len(resp.content) > 80 else ''}")
print()
# ========== JSON 格式输出 ==========
print("=" * 50)
print("【JSON格式输出】")
json_response = await client.chat(
[AgentMessage(role="user", content="返回一个包含name和age的JSON对象")],
format="json"
)
print(json_response.content)
print()
print("=" * 50)
print("演示完成")
if __name__ == "__main__":
asyncio.run(main())
Agent 工具定义
为了让 LLM 知道有哪些工具可用,我们需要定义工具的 Schema。这里我们遵循 OpenAI 的 Function Calling 格式,定义了 TOOLS 列表:
- tool_get_weather:描述为“查询指定城市的天气情况”,包含必填参数 city。
- tool_calculator:描述为“执行数学计算”,包含必填参数 expression。
同时,我们维护了一个 TOOL_REGISTRY 字典,将工具名称字符串映射到实际的 Python 函数,实现了代码的解耦。
- 工具定义代码
# app/ai_agent/agent_tools.py
import logging
from app.utils.weather import OpenMeteoWeather
from app.utils.pers_math import calculator
TOOLS = [
{
"type": "function",
"function": {
"name": "tool_get_weather",
"description": "查询指定城市的天气情况",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,如:北京、上海"
}
},
"required": ["city"]
}
}
},
{
"type": "function",
"function": {
"name": "tool_calculator",
"description": "执行数学计算,支持加减乘除和括号",
"parameters": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "要计算的数学表达式,如:123 * 456 或 (10 + 20) * 3"
}
},
"required": ["expression"]
}
}
}
]
def tool_get_weather(city: str) -> str:
try:
weather_client = OpenMeteoWeather()
weather_info = weather_client.get_weather_by_city(city)
if weather_info:
return weather_client.format_weather_info(weather_info)
else:
return "无法获取天气信息"
except Exception as e:
# 捕获未预料到的错误,防止程序崩溃
logging.error(f"获取天气时发生错误: {e}")
return f"获取天气时发生错误: {str(e)}"
def tool_calculator(expression: str) -> str:
try:
ret_dic = calculator(expression)
if ret_dic:
return ret_dic['result']
except Exception as e:
# 捕获未预料到的错误,防止程序崩溃
logging.error(f"计算出错: {e}")
return f"计算时出错: {str(e)}"
ReAct 智能体主逻辑
这是整个系统的指挥中心,AgenticBot 类实现了完整的 ReAct 循环逻辑。
- System Prompt 策略:我们在系统提示词中明确规定了模型的行为准则,特别强调了“准确提取参数”和“自我纠错”。例如,如果模型错误地查询了北京而不是用户问的上海,Prompt 会强迫它再次调用工具修正错误。
- ReAct 循环:
- 将用户问题与历史对话发送给 LLM。
- 检查 LLM 返回的响应中是否包含 tool_calls。
- 如果无工具调用,直接返回最终文本回答。
- 如果有工具调用,遍历 tool_calls,通过 TOOL_REGISTRY 执行对应函数。
- 将工具执行结果以 role: "tool" 的形式追加回消息列表。
- 进入下一轮迭代,直到达到最大迭代次数或获得最终答案。
- 低温度设置:在初始化客户端时,我们将 temperature 设为 0,这是为了让模型在提取工具参数时更加严谨,减少幻觉。
- Agentic ReAct 样例代码
# sample/sample.py
import asyncio
from typing import Dict, Any, Callable
from app.ai_agent.agent_tools import tool_calculator, tool_get_weather
from app.ai_agent.agent_tools import TOOLS
from app.core.pers_ollama import OllamaClient
# ========== 工具注册表(替代硬编码) ==========
TOOL_REGISTRY: Dict[str, Callable] = {
"tool_get_weather": tool_get_weather,
"tool_calculator": tool_calculator,
}
class AgenticBot:
"""基于 ReAct 循环的智能 Agent"""
# 定义系统提示词,约束模型行为
SYSTEM_PROMPT = (
"你是一个智能助手,可以使用工具获取信息。\n\n"
"严格遵循以下规则:\n"
"1. **准确提取参数**:调用工具时,必须严格从用户输入中提取城市或数值,严禁凭空捏造。\n"
"2. **自我纠错**:如果你发现上一步调用的工具参数是错误的(例如用户问上海你却查了北京),"
"你必须再次调用工具并传入正确的参数来获取正确信息。\n"
"3. **不要只解释**:发现错误时,不要用文字描述你要怎么做,直接调用工具去执行。\n"
"4. **任务完成**:只有当你获得了准确的工具结果后,才向用户进行最终回答。"
)
def __init__(
self,
base_url: str = "http://localhost:11434",
model: str = "llama3.1:8b",
max_iterations: int = 5,
):
# 在客户端初始化时默认使用较低的温度,适合工具调用
self.client = OllamaClient(
base_url=base_url,
model=model,
temperature=0 # 设为 0 减少随机性,提高参数提取准确率
)
self.model = model
self.max_iterations = max_iterations
async def close(self):
"""关闭客户端"""
await self.client.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def call_tool(self, tool_name: str, arguments: dict, tool_call_id: str) -> Dict[str, Any]:
"""
执行指定的工具函数
Args:
tool_name: 工具名称
arguments: 工具参数
tool_call_id: 工具调用 ID(用于构建消息)
Returns:
工具执行结果消息
"""
try:
if tool_name not in TOOL_REGISTRY:
raise ValueError(f"未知工具: {tool_name}")
print(f"🔧 调用工具: {tool_name}")
print(f"📝 参数: {arguments}")
# 执行工具函数
result = TOOL_REGISTRY[tool_name](**arguments)
print(f"✅ 工具结果: {result}\n")
return {
"role": "tool",
"content": str(result),
"name": tool_name,
"tool_call_id": tool_call_id, # 必需字段
}
except Exception as e:
print(f"❌ 工具执行失败: {e}\n")
return {
"role": "tool",
"content": f"工具执行错误: {str(e)}",
"name": tool_name,
"tool_call_id": tool_call_id,
}
async def chat_with_bot(self, user_message: str) -> str:
"""
实现完整的 ReAct 循环
"""
# 构建消息列表,第一条就是 System Prompt
messages = [
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": user_message}
]
for iteration in range(self.max_iterations):
if iteration > 0:
print(f"🔄 第 {iteration} 轮思考...")
# 调用 LLM (temperature 已在初始化时设为 0)
response = await self.client.chat(
messages=messages,
tools=TOOLS,
)
# ... (后续逻辑保持不变) ...
if not response.tool_calls:
return response.content
assistant_message = {
"role": "assistant",
"content": response.content,
"tool_calls": response.tool_calls,
}
messages.append(assistant_message)
for tool_call in response.tool_calls:
tool_name = tool_call["function"]["name"]
tool_args = tool_call["function"]["arguments"]
tool_call_id = tool_call["id"]
tool_result_message = await self.call_tool(
tool_name, tool_args, tool_call_id
)
messages.append(tool_result_message)
final_response = await self.client.chat(messages=messages)
return final_response.content
# ========== 主程序 ==========
async def main():
"""主程序入口"""
test_questions = [
"上海现在冷不冷?",
"今天北京天气怎么样?",
"123 * 456 是多少?",
"帮我算一下 (100 - 50) * 2 + 10",
]
print("🤖 天气与数学机器人已启动!")
print("=" * 50)
async with AgenticBot(
base_url="http://localhost:11434",
model="llama3.1:8b",
max_iterations=5,
) as bot:
for question in test_questions:
print(f"\n👤 用户: {question}")
print(f"🤖 机器人: ", end="", flush=True)
try:
answer = await bot.chat_with_bot(question)
print(answer)
except Exception as e:
print(f"❌ 发生错误: {e}")
print("-" * 50)
print("✅ 程序结束")
if __name__ == "__main__":
asyncio.run(main())
运行示例
准备好所有代码文件后,在终端运行主程序:
python sample/sample.py
🤖 天气与数学机器人已启动!
==================================================
👤 用户: 上海现在冷不冷?
🤖 机器人: 🔧 调用工具: tool_get_weather
📝 参数: {'city': '北京'}
城市坐标查询失败: HTTPSConnectionPool(host='geocoding-api.open-meteo.com', port=443): Read timed out. (read timeout=10)
未找到城市: 北京
✅ 工具结果: 无法获取天气信息
🔄 第 1 轮思考...
🔧 调用工具: tool_get_weather
📝 参数: {'city': '上海'}
✅ 工具结果: 城市: 上海
温度: 2.4°C
体感温度: -1.5°C
湿度: 67%
风速: 9.0 km/h
天气: 大部分晴朗
🔄 第 2 轮思考...
上海现在冷得很呢,温度只有2.4°C。
--------------------------------------------------
👤 用户: 今天北京天气怎么样?
🤖 机器人: 🔧 调用工具: tool_get_weather
📝 参数: {'city': '北京'}
城市坐标查询失败: HTTPSConnectionPool(host='geocoding-api.open-meteo.com', port=443): Read timed out. (read timeout=10)
未找到城市: 北京
✅ 工具结果: 无法获取天气信息
🔄 第 1 轮思考...
🔧 调用工具: tool_get_weather
📝 参数: {'city': '北京'}
✅ 工具结果: 城市: 北京
温度: 7.6°C
体感温度: 4.1°C
湿度: 54%
风速: 7.6 km/h
天气: 阴天
🔄 第 2 轮思考...
今天北京的天气是阴天,温度为7.6°C,体感温度为4.1°C,湿度为54%,风速为7.6 km/h。
--------------------------------------------------
👤 用户: 123 * 456 是多少?
🤖 机器人: 🔧 调用工具: tool_calculator
📝 参数: {'expression': '123 * 456'}
✅ 工具结果: 56088
🔄 第 1 轮思考...
答案是:123 * 456 = 56088
--------------------------------------------------
👤 用户: 帮我算一下 (100 - 50) * 2 + 10
🤖 机器人: 🔧 调用工具: tool_calculator
📝 参数: {'expression': '(100 - 50) * 2 + 10'}
✅ 工具结果: 110
🔄 第 1 轮思考...
结果为:110
--------------------------------------------------
✅ 程序结束
从输出结果可以看到,Agent 不仅能够成功调用工具获取天气和计算结果,甚至在网络出现超时导致第一次查询失败时(如查询北京时),还能依据我们的 System Prompt 自主进行“第 N 轮思考”,重试工具调用并最终成功获取数据,完美体现了 ReAct 范式的智能性。
以上便是本文的全部内容,感谢您的阅读,如遇到任何问题,欢迎在评论区留言讨论。