自己写登录系统是危险的。OAuth2 + JWT是最成熟的组合,MonkeyCode帮你把安全风险降到最低。
为什么不用Session+Cookie了?
传统Session方案的痛点:
| 问题 | 表现 |
|---|---|
| 扩展性差 | Session存在单台服务器内存,多实例无法共享 |
| CSRF风险 | Cookie自动携带,容易被恶意网站利用 |
| 跨域麻烦 | Cookie在跨域场景下各种限制 |
| 移动端不友好 | App/小程序很难处理Cookie |
JWT + OAuth2 是无状态、跨平台、支持SSO的现代方案。
OAuth2四种授权模式
MonkeyCode帮你选最合适的:
| 模式 | 适用场景 | 典型用户 |
|---|---|---|
| 授权码模式 | 有后端的Web应用(最安全) | 你的SaaS平台 |
| 隐式模式 | 纯前端SPA(已淘汰,不推荐) | — |
| 密码模式 | 高度信任的第一方App | 你自己的官方App |
| 客户端凭证模式 | 服务间调用 | 微服务A调用微服务B |
99%的Web应用应该用:授权码模式 + PKCE(防止授权码被截获)。
实战:用Authlib实现OAuth2 + JWT
安装依赖
让MonkeyCode生成requirements.txt:
authlib==1.3.0
fastapi==0.104.0
pydantic==2.5.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
完整实现(授权码模式 + JWT)
# app/auth.py - 完整认证模块
from fastapi import FastAPI, Depends, HTTPException, Request, status
from authlib.integrations.starlette import OAuth
from starlette.middleware.sessions import SessionMiddleware
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional, List
import os
app = FastAPI(title="OAuth2 JWT Demo")
app.add_middleware(SessionMiddleware, secret_key=os.getenv("SESSION_SECRET", "dev-secret"))
# ─── 密码哈希 ───
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# ─── JWT工具 ───
SECRET_KEY = os.getenv("JWT_SECRET", "dev-jwt-secret-change-in-prod")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> dict:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
raise HTTPException(status_code=401, detail="Token无效或已过期")
# ─── OAuth2第三方登录(GitHub示例)───
oauth = OAuth()
oauth.register(
name="github",
client_id=os.getenv("GITHUB_CLIENT_ID"),
client_secret=os.getenv("GITHUB_CLIENT_SECRET"),
access_token_url="https://github.com/login/oauth/access_token",
access_token_params=None,
authorize_url="https://github.com/login/oauth/authorize",
authorize_params=None,
api_base_url="https://api.github.com/",
client_kwargs={"scope": "user:email"},
)
@app.get("/auth/github/login")
async def github_login(request: Request):
redirect_uri = request.url_for("github_callback")
return await oauth.github.authorize_redirect(request, redirect_uri)
@app.get("/auth/github/callback")
async def github_callback(request: Request):
token = await oauth.github.authorize_access_token(request)
user_info = await oauth.github.get("user", token=token)
user_email = user_info.json().get("email")
# 查找或创建用户
user = await db.get_user_by_email(user_email)
if not user:
user = await db.create_user(email=user_email, name=user_info.json()["login"])
# 签发JWT
access_token = create_access_token({"sub": str(user.id), "email": user.email})
refresh_token = create_refresh_token({"sub": str(user.id)})
# 重定向回前端,把token放在URL fragment(不会发到后端)
frontend_url = os.getenv("FRONTEND_URL", "http://localhost:3000")
return RedirectResponse(
url=f"{frontend_url}/auth/callback?access_token={access_token}&refresh_token={refresh_token}"
)
# ─── 本地注册/登录 ───
from pydantic import BaseModel
class RegisterRequest(BaseModel):
email: str
password: str
name: str
class LoginRequest(BaseModel):
email: str
password: str
@app.post("/auth/register")
async def register(req: RegisterRequest):
existing = await db.get_user_by_email(req.email)
if existing:
raise HTTPException(400, "邮箱已注册")
hashed = hash_password(req.password)
user = await db.create_user(email=req.email, name=req.name, password_hash=hashed)
access_token = create_access_token({"sub": str(user.id), "email": user.email})
refresh_token = create_refresh_token({"sub": str(user.id)})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60
}
@app.post("/auth/login")
async def login(req: LoginRequest):
user = await db.get_user_by_email(req.email)
if not user or not verify_password(req.password, user.password_hash):
raise HTTPException(401, "邮箱或密码错误")
access_token = create_access_token({"sub": str(user.id), "email": user.email})
refresh_token = create_refresh_token({"sub": str(user.id)})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60
}
# ─── 刷新Token ───
class RefreshRequest(BaseModel):
refresh_token: str
@app.post("/auth/refresh")
async def refresh(req: RefreshRequest):
payload = decode_token(req.refresh_token)
if payload.get("type") != "refresh":
raise HTTPException(401, "无效的refresh token")
new_access_token = create_access_token({"sub": payload["sub"]})
return {"access_token": new_access_token, "token_type": "bearer"}
# ─── 获取当前用户(依赖注入)───
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(cred: HTTPAuthorizationCredentials = Depends(security)) -> dict:
payload = decode_token(cred.credentials)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(401, "无效的token")
user = await db.get_user_by_id(int(user_id))
if not user:
raise HTTPException(401, "用户不存在")
return user
# ─── 受保护的接口 ───
@app.get("/api/me")
async def get_me(current_user: dict = Depends(get_current_user)):
return {
"id": current_user["id"],
"email": current_user["email"],
"name": current_user["name"]
}
@app.get("/api/protected")
async def protected_route(current_user: dict = Depends(get_current_user)):
return {"message": f"你好,{current_user['name']}!"}
前端如何携带JWT
// 登录后存储token
const login = async (email, password) => {
const resp = await fetch("http://localhost:8000/auth/login", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ email, password })
});
const data = await resp.json();
localStorage.setItem("access_token", data.access_token);
localStorage.setItem("refresh_token", data.refresh_token);
};
// 请求拦截器:自动附加token
const apiFetch = async (url, options = {}) => {
const token = localStorage.getItem("access_token");
const headers = {
...options.headers,
"Authorization": `Bearer ${token}`
};
let resp = await fetch(url, { ...options, headers });
// Token过期,尝试刷新
if (resp.status === 401) {
const refreshed = await refreshToken();
if (refreshed) {
const newToken = localStorage.getItem("access_token");
headers["Authorization"] = `Bearer ${newToken}`;
resp = await fetch(url, { ...options, headers });
}
}
return resp;
};
const refreshToken = async () => {
const refreshToken = localStorage.getItem("refresh_token");
const resp = await fetch("http://localhost:8000/auth/refresh", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ refresh_token: refreshToken })
});
if (resp.ok) {
const data = await resp.json();
localStorage.setItem("access_token", data.access_token);
return true;
}
// Refresh token也过期,跳转登录
localStorage.clear();
window.location.href = "/login";
return false;
};
PKCE:为什么授权码模式需要它?
传统授权码模式有个漏洞:授权码在回调URL中,可能被恶意App截获(Android/iOS的Intent劫持)。
PKCE(Proof Key for Code Exchange) 解决这个问题:
前端生成 code_verifier(随机字符串)
↓ hash → code_challenge
↓ 把 code_challenge 发给授权服务器
授权服务器返回授权码
↓ 前端带着 授权码 + code_verifier 换token
服务器验证 hash(code_verifier) == code_challenge?
通过才发token
MonkeyCode生成的PKCE实现:
import hashlib
import base64
import secrets
def generate_pkce_pair():
"""生成code_verifier和code_challenge"""
code_verifier = base64.urlsafe_b64encode(
secrets.token_bytes(32)
).decode().rstrip("=")
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).decode().rstrip("=")
return code_verifier, code_challenge
# 前端登录时
verifier, challenge = generate_pkce_pair()
session["pkce_verifier"] = verifier # 存在session
redirect_url = f"https://auth-server/oauth/authorize?...&code_challenge={challenge}&code_challenge_method=S256"
# 回调时
stored_verifier = session.pop("pkce_verifier")
# 用 verifier 换token,服务器会验证
安全最佳实践(MonkeyCode自动检查)
让MonkeyCode检查我的认证代码的安全性,列出所有风险点
| 检查项 | 风险 | 修复 |
|---|---|---|
| JWT Secret硬编码 | Token可被伪造 | 用环境变量,长度≥32字符 |
| 没用HTTPS | Token被中间人截获 | 生产环境强制HTTPS |
| access token过期时间太长 | 泄露后影响大 | ≤30分钟,用refresh token续期 |
| 没做登出黑名单 | token被盗用无法撤销 | 登出时把token加入黑名单(Redis) |
| 没限制登录尝试 | 容易被暴力破解 | 登录失败5次锁定15分钟 |
| 密码强度不够 | 弱密码被撞库 | 注册时强制8位+大小写+数字 |
登出 + Token黑名单
JWT是无状态的,服务器端无法主动撤销。解决方案:
import redis.asyncio as redis
r = await redis.from_url("redis://localhost:6379")
async def logout(token: str, current_user: dict = Depends(get_current_user)):
# 把token加入黑名单(TTL设为剩余有效期)
payload = decode_token(token)
exp = payload.get("exp")
ttl = exp - int(datetime.utcnow().timestamp())
if ttl > 0:
await r.setex(f"bl:{token}", ttl, "1")
return {"message": "登出成功"}
# 在get_current_user中检查黑名单
async def get_current_user(cred: HTTPAuthorizationCredentials = Depends(security)):
payload = decode_token(cred.credentials)
# 检查黑名单
is_blacklisted = await r.exists(f"bl:{cred.credentials}")
if is_blacklisted:
raise HTTPException(401, "Token已失效,请重新登录")
# ... 后续逻辑
MonkeyCode Prompt模板
用FastAPI + Authlib + JWT实现完整的OAuth2认证系统,包含:
1. 本地注册/登录(密码bcrypt哈希)
2. GitHub/GitLab第三方登录(OAuth2授权码模式+PKCE)
3. JWT access token(30分钟)+ refresh token(7天)
4. Token刷新接口
5. 登出 + Redis黑名单
6. 登录频率限制(5次失败锁定15分钟)
7. 密码强度校验(注册时)
8. 生成单元测试(pytest)
总结
OAuth2 + JWT的正确姿势:
- 授权码模式 + PKCE:最安全,适用于所有Web和移动端
- 短有效期access token + refresh token:平衡安全和用户体验
- Token黑名单:解决JWT无法主动撤销的问题
- HTTPS + 环境变量存Secret:生产环境安全底线
MonkeyCode能帮你从0生成完整认证代码,并自动检查常见安全漏洞。记住:认证模块不要自己手写,用成熟的库(Authlib/python-jose)。
文章摘自:https://www.cnblogs.com/jaryn/p/20224109
