如何用 FastAPI 和 RBAC 打造坚不可摧的安全堡垒?



扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长

发现1000+提升效率与开发的AI工具和实用程序https://tools.cmdragon.cn/

一、FastAPI 安全认证与 RBAC 系统原理

1.1 RBAC 基础概念

基于角色的访问控制(RBAC)是保护系统资源的经典模型,其核心包含三个要素:

  • 用户(User):系统操作主体
  • 角色(Role):权限的集合载体(如:管理员、普通用户)
  • 权限(Permission):具体操作权限(如:商品删除、订单修改)

1.2 FastAPI 安全组件

核心依赖包及版本要求:

fastapi == 0.95
.2
uvicorn == 0.22
.0
python - jose[cryptography] == 3.3
.0
passlib[bcrypt] == 1.7
.4

安装命令:

pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt]

二、电商平台 RBAC 系统实现

2.1 数据模型定义

使用 Pydantic 和 SQLAlchemy 构建领域模型:

from pydantic import BaseModel
from sqlalchemy import Column, Integer, String, Table, ForeignKey


class PermissionBase(BaseModel):
    code: str  # 权限编码,如 "order:delete"
    description: str


class RoleCreate(BaseModel):
    name: str
    permission_ids: list[int]


# SQLAlchemy 模型
user_role = Table(
    'user_role', Base.metadata,
    Column('user_id', ForeignKey('users.id')),
    Column('role_id', ForeignKey('roles.id'))
)

role_permission = Table(
    'role_permission', Base.metadata,
    Column('role_id', ForeignKey('roles.id')),
    Column('permission_id', ForeignKey('permissions.id'))
)

2.2 认证系统实现

JWT 令牌签发与验证:

from jose import JWTError, jwt
from datetime import datetime, timedelta

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"


def create_access_token(data: dict) -> str:
    expire = datetime.utcnow() + timedelta(hours=1)
    return jwt.encode(
        {**data, "exp": expire},
        SECRET_KEY,
        algorithm=ALGORITHM
    )


async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    try:
        payload = jwt.decode(token, SECRET_KEY, [ALGORITHM])
        return await UserService.get_user(payload["sub"])
    except JWTError:
        raise HTTPException(401, "Invalid token")

2.3 权限校验系统

动态权限依赖注入:

def check_permission(permission_code: str):
    async def _permission_checker(
            user: User = Depends(get_current_user)
    ):
        if not any(p.code == permission_code for p in user.permissions):
            raise HTTPException(403, "Permission denied")

    return Depends(_permission_checker)


# 在路由中使用
@app.delete("/orders/{order_id}")
async def delete_order(
        order_id: str,
        _=check_permission("order:delete")
):
    return OrderService.delete_order(order_id)

三、安全防护策略

3.1 敏感数据加密

密码存储安全处理:

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

3.2 请求速率限制

防止暴力破解攻击:

from fastapi import Request
from fastapi.middleware import Middleware


class RateLimiter:
    def __init__(self, times: int, seconds: int):
        self.times = times
        self.seconds = seconds

    async def __call__(self, request: Request):
        client_ip = request.client.host
        # 使用 Redis 记录请求次数
        current = await redis.incr(client_ip)
        if current > self.times:
            raise HTTPException(429, "Too many requests")
        await redis.expire(client_ip, self.seconds)

四、测试与调试

4.1 单元测试示例

权限验证测试用例:

from fastapi.testclient import TestClient


def test_admin_access():
    client = TestClient(app)
    # 普通用户令牌
    token = create_test_token(user_type="user")
    response = client.delete(
        "/products/123",
        headers={"Authorization": f"Bearer {token}"}
    )
    assert response.status_code == 403

    # 管理员令牌
    admin_token = create_test_token(user_type="admin")
    response = client.delete(
        "/products/123",
        headers={"Authorization": f"Bearer {admin_token}"}
    )
    assert response.status_code == 200

4.2 常见错误处理

错误 1:401 Unauthorized

{
  "detail": "Could not validate credentials"
}

解决方法

  1. 检查请求头 Authorization 格式是否正确
  2. 确认令牌未过期
  3. 验证密钥是否匹配

错误 2:403 Forbidden

{
  "detail": "Permission denied"
}

预防建议

  • 在路由处理器前添加权限检查中间件
  • 定期审查角色权限分配

课后测验

问题 1: 为什么要在密码哈希中使用 salt?
答案: Salt 随机值能有效防止彩虹表攻击,即使相同密码也会生成不同的哈希值

问题 2: 如何实现动态权限管理?
答案: 应该建立权限管理系统接口,允许管理员动态配置角色与权限的对应关系

问题 3: JWT 令牌为什么要设置过期时间?
答案: 缩短令牌有效期可以降低令牌泄露风险,建议结合 refresh token 使用

版本兼容说明

所有代码在以下环境验证通过:

  • Python 3.8+
  • FastAPI 0.95+
  • SQLAlchemy 1.4+
  • Redis 6.2+

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何用 FastAPI 和 RBAC 打造坚不可摧的电商堡垒?

往期文章归档: