Henry
发布于 2025-10-28 / 8 阅读
0
0

Python - 基于FastAPI 添加 redis 模块

背景简介

在 Python 项目中添加 redis 模块,供其他模块使用。

前置信息

  1. Python 3.11.13 【Conda - 创建 Python 环境
  2. Redis 7 【Redis - 基于 Docker 部署

详细信息

文件架构

.
├── app
│   ├── api
│   │   └── v1
│   │       ├── health.py
│   │       └── redis.py 
│   ├── core
│   │   ├── config.py 
│   │   ├── __init__.py
│   │   └── redis.py
│   ├── models
│   │   ├── __init__.py
│   │   └── user_basic_info.py
│   ├── __init__.py
│   └── main.py
├── tests
│   └── test_health.py
├── .env
└── requirements.txt

依赖库

  • 更新 requirements.txt
# other dependencies ....

# redis dependency
redis

Python 代码

  • 更新 .env 文件,添加 redis 连接信息
REDIS_URL=redis://:your_strong_redis_password@localhost:6379/0
  • 更新 app/core/config.py ,添加 redis 连接信息
"""Application settings configuration module.

This module defines the Settings class for managing application configuration
using Pydantic BaseSettings. It loads configuration from environment variables
and a .env file located in the project root directory.
"""

from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    """Application settings configuration class.

    This class manages all application configuration settings, including
    basic app information and database configuration. It automatically
    loads settings from environment variables and a `.env` file located
    in the project's current working directory.
    """
    
    # other settings ...
  
    # redis settings
    REDIS_URL: str # Redis connection url from .env file

    model_config = SettingsConfigDict(
        env_file="./.env", 
        env_file_encoding="utf-8", 
        extra="ignore",
        case_sensitive=False
    )


# Global settings instance
settings = Settings()
  • 新增 app/core/redis.py redis 核心模块
# app/core/redis.py

from fastapi import Request, Depends
from redis.asyncio import Redis as AsyncRedis
from typing import Annotated

async def get_redis_client(request: Request) -> AsyncRedis:
    """
    Dependency function to get the Redis client from the application's state.

    This function retrieves the Redis client instance that was created during
    application startup and stored in `app.state`.

    Args:
        request: The incoming request object, used to access the application instance.

    Returns:
        The shared AsyncRedis client instance.
    """
    return request.app.state.redis

# Define a type alias for the dependency for cleaner function signatures
# This is the recommended way to inject dependencies in FastAPI
RedisDep = Annotated[AsyncRedis, Depends(get_redis_client)]
  • 添加 app/api/v1/redis.py
from fastapi import APIRouter, Depends, HTTPException, status, Query
from fastapi.responses import JSONResponse
from typing import Annotated
from app.core.redis import RedisDep 

# Create a FastAPI router
redis_router = APIRouter()

# --- Error Code ---
ERR_ADD_REDIS_KV_FAIL = 10001
ERR_GET_REDIS_KV_FAIL = 10002
ERR_GET_REDIS_KV_NULL = 10003


@redis_router.post("/cache/{key}", summary="Set redis key-value", status_code=status.HTTP_200_OK)
async def set_cache_item(
    key: str, 
    value: str, 
    redis: RedisDep,
    ex: Annotated[int | None, Query(description="Set an expire flag, specified in seconds.")] = None,
):
    """Sets a key-value pair in Redis, with an optional expiration time."""
    try:
        await redis.set(key, value, ex=ex)
        return JSONResponse(
            status_code=status.HTTP_200_OK,
            content={"code":0, "message": f"Item '{key}' cached successfully."},
        )
    except Exception as e: # exception handling
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # internal error
            content={"code":ERR_ADD_REDIS_KV_FAIL, "message": f"Failed to set cache item: {str(e)}"},
        )
    
@redis_router.get("/cache/{key}", summary="Get redis key-value", status_code=status.HTTP_200_OK)
async def get_cache_item(
    key: str, 
    redis: RedisDep
):
    """Retrieves a value by its key from Redis."""
    try:
        value = await redis.get(key)
        if value is None:
            return JSONResponse(
                status_code=status.HTTP_404_NOT_FOUND, # 404 Not Found 
                content={"code":ERR_GET_REDIS_KV_NULL, "message": f"Item '{key}' not found in cache."},
            )
        return JSONResponse(
            status_code=status.HTTP_200_OK,
            content={"code":0, "message":"success", "data": {"key": key, "value": value}},
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"code":ERR_GET_REDIS_KV_FAIL, "message": f"Failed to get cache item: {str(e)}"},
        )
  • 更新 app/main.py,添加 redis 模块
# app/main.py

from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.api.v1.health import health_router
from app.api.v1.redis import redis_router
from app.core.config import settings
from redis.asyncio import Redis as AsyncRedis # 导入异步客户端


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manages application lifecycle events."""
    # --- Startup ---
    print("Application startup...")
    # 创建 Redis 客户端并存储在 app.state 中
    app.state.redis = AsyncRedis.from_url(
        settings.REDIS_URL,
        encoding="utf-8",
        decode_responses=True,
    )
    # 验证连接是否成功
    await app.state.redis.ping()
    print("Redis client connected and initialized.")
    
    yield  # 应用在此处运行,等待请求

    # --- Shutdown ---
    print("Application shutdown...")
    # 关闭 Redis 连接
    await app.state.redis.close()
    print("Redis client closed.")


def create_app() -> FastAPI:
    """
    Create FastAPI instance
    Returns:
        app (FastAPI): FastAPI instance
    """
    app = FastAPI(
        title=settings.APP_NAME,
        version=settings.VERSION,
        debug=settings.DEBUG,
        description="Standard FastAPI Structure。",
        lifespan=lifespan,
    )

    # 注册路由
    app.include_router(health_router, prefix="/api/v1", tags=["Health"])
    app.include_router(redis_router, prefix="/api/v1", tags=["redis"])

    return app


app = create_app()

验证

  • 启动服务
uvicorn app.main:app --host 0.0.0.0 --port 8080
INFO:     Started server process [969435]
INFO:     Waiting for application startup.
Application startup...
Redis client connected and initialized.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
  • 设置键值对
curl -X POST "http://localhost:8080/api/v1/cache/mykey?value=myvalue&ex=60"
{"code":0,"message":"Item 'mykey' cached successfully."}
  • 获取键值对
curl -X GET "http://localhost:8080/api/v1/cache/mykey"
{"code":0,"message":"success","data":{"key":"mykey","value":"myvalue"}}
  • 测试不存在的 Key
curl -X GET "http://localhost:8080/api/v1/cache/mykey2"
{"code":10003,"message":"Item 'mykey2' not found in cache."}

以上便是本文的全部内容,感谢您的阅读,如遇到任何问题,欢迎在评论区留言讨论。



评论