背景简介
在 Python 项目中添加 redis 模块,供其他模块使用。
前置信息
- Python 3.11.13 【Conda - 创建 Python 环境】
- Redis 7 【Redis - 基于 Docker 部署】
详细信息
文件架构
.
├── app
│ ├── api
│ │ └── v1
│ │ ├── health.py
│ │ └── redis.py
│ ├── core
│ │ ├── config.py
│ │ ├── __init__.py
│ │ └── redis.py
│ ├── models
│ │ ├── __init__.py
│ │ └── user_basic_info.py
│ ├── __init__.py
│ └── main.py
├── tests
│ └── test_health.py
├── .env
└── requirements.txt
依赖库
- 更新
requirements.txt
# other dependencies ....
# redis dependency
redis
Python 代码
- 更新
.env文件,添加 redis 连接信息
REDIS_URL=redis://:your_strong_redis_password@localhost:6379/0
- 更新
app/core/config.py,添加 redis 连接信息
"""Application settings configuration module.
This module defines the Settings class for managing application configuration
using Pydantic BaseSettings. It loads configuration from environment variables
and a .env file located in the project root directory.
"""
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application settings configuration class.
This class manages all application configuration settings, including
basic app information and database configuration. It automatically
loads settings from environment variables and a `.env` file located
in the project's current working directory.
"""
# other settings ...
# redis settings
REDIS_URL: str # Redis connection url from .env file
model_config = SettingsConfigDict(
env_file="./.env",
env_file_encoding="utf-8",
extra="ignore",
case_sensitive=False
)
# Global settings instance
settings = Settings()
- 新增
app/core/redis.pyredis 核心模块
# app/core/redis.py
from fastapi import Request, Depends
from redis.asyncio import Redis as AsyncRedis
from typing import Annotated
async def get_redis_client(request: Request) -> AsyncRedis:
"""
Dependency function to get the Redis client from the application's state.
This function retrieves the Redis client instance that was created during
application startup and stored in `app.state`.
Args:
request: The incoming request object, used to access the application instance.
Returns:
The shared AsyncRedis client instance.
"""
return request.app.state.redis
# Define a type alias for the dependency for cleaner function signatures
# This is the recommended way to inject dependencies in FastAPI
RedisDep = Annotated[AsyncRedis, Depends(get_redis_client)]
- 添加
app/api/v1/redis.py
from fastapi import APIRouter, Depends, HTTPException, status, Query
from fastapi.responses import JSONResponse
from typing import Annotated
from app.core.redis import RedisDep
# Create a FastAPI router
redis_router = APIRouter()
# --- Error Code ---
ERR_ADD_REDIS_KV_FAIL = 10001
ERR_GET_REDIS_KV_FAIL = 10002
ERR_GET_REDIS_KV_NULL = 10003
@redis_router.post("/cache/{key}", summary="Set redis key-value", status_code=status.HTTP_200_OK)
async def set_cache_item(
key: str,
value: str,
redis: RedisDep,
ex: Annotated[int | None, Query(description="Set an expire flag, specified in seconds.")] = None,
):
"""Sets a key-value pair in Redis, with an optional expiration time."""
try:
await redis.set(key, value, ex=ex)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={"code":0, "message": f"Item '{key}' cached successfully."},
)
except Exception as e: # exception handling
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # internal error
content={"code":ERR_ADD_REDIS_KV_FAIL, "message": f"Failed to set cache item: {str(e)}"},
)
@redis_router.get("/cache/{key}", summary="Get redis key-value", status_code=status.HTTP_200_OK)
async def get_cache_item(
key: str,
redis: RedisDep
):
"""Retrieves a value by its key from Redis."""
try:
value = await redis.get(key)
if value is None:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND, # 404 Not Found
content={"code":ERR_GET_REDIS_KV_NULL, "message": f"Item '{key}' not found in cache."},
)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={"code":0, "message":"success", "data": {"key": key, "value": value}},
)
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"code":ERR_GET_REDIS_KV_FAIL, "message": f"Failed to get cache item: {str(e)}"},
)
- 更新
app/main.py,添加 redis 模块
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.api.v1.health import health_router
from app.api.v1.redis import redis_router
from app.core.config import settings
from redis.asyncio import Redis as AsyncRedis # 导入异步客户端
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manages application lifecycle events."""
# --- Startup ---
print("Application startup...")
# 创建 Redis 客户端并存储在 app.state 中
app.state.redis = AsyncRedis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True,
)
# 验证连接是否成功
await app.state.redis.ping()
print("Redis client connected and initialized.")
yield # 应用在此处运行,等待请求
# --- Shutdown ---
print("Application shutdown...")
# 关闭 Redis 连接
await app.state.redis.close()
print("Redis client closed.")
def create_app() -> FastAPI:
"""
Create FastAPI instance
Returns:
app (FastAPI): FastAPI instance
"""
app = FastAPI(
title=settings.APP_NAME,
version=settings.VERSION,
debug=settings.DEBUG,
description="Standard FastAPI Structure。",
lifespan=lifespan,
)
# 注册路由
app.include_router(health_router, prefix="/api/v1", tags=["Health"])
app.include_router(redis_router, prefix="/api/v1", tags=["redis"])
return app
app = create_app()
验证
- 启动服务
uvicorn app.main:app --host 0.0.0.0 --port 8080
INFO: Started server process [969435]
INFO: Waiting for application startup.
Application startup...
Redis client connected and initialized.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
- 设置键值对
curl -X POST "http://localhost:8080/api/v1/cache/mykey?value=myvalue&ex=60"
{"code":0,"message":"Item 'mykey' cached successfully."}
- 获取键值对
curl -X GET "http://localhost:8080/api/v1/cache/mykey"
{"code":0,"message":"success","data":{"key":"mykey","value":"myvalue"}}
- 测试不存在的 Key
curl -X GET "http://localhost:8080/api/v1/cache/mykey2"
{"code":10003,"message":"Item 'mykey2' not found in cache."}
以上便是本文的全部内容,感谢您的阅读,如遇到任何问题,欢迎在评论区留言讨论。