FastAPI WebSocket:你的双向通信通道为何如此丝滑?



扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长

发现1000+提升效率与开发的AI工具和实用程序https://tools.cmdragon.cn/

# 示例代码运行环境
# Python 3.8+
# 安装依赖:pip install fastapi==0.68.0 uvicorn==0.15.0 websockets==10.3 pydantic==1.10.7

一、WebSocket路由声明规范

1.1 基础路由定义

使用@app.websocket装饰器声明WebSocket路由:

from fastapi import FastAPI, WebSocket

app = FastAPI()


@app.websocket("/ws/chat/{room_id}")
async def websocket_chat(websocket: WebSocket, room_id: int):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Room {room_id}: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

关键要点解析:

  • 路径参数通过URL直接传递(如{room_id})
  • WebSocket对象自动注入到路由函数
  • 必须显式调用await websocket.accept()建立连接
  • receive_text()和send_text()实现双向通信

1.2 路由参数验证

结合路径参数与查询参数进行复合验证:

from fastapi import Query, WebSocket


@app.websocket("/ws/secure/{client_id}")
async def secure_ws(
        websocket: WebSocket,
        client_id: int = Path(..., gt=0),
        token: str = Query(..., min_length=8)
):
    if not validate_token(token):
        await websocket.close(code=1008)
        return
    # ...连接处理逻辑...

二、客户端连接建立与握手验证

2.1 握手过程控制

自定义握手验证流程:

@app.websocket("/ws/auth")
async def auth_ws(websocket: WebSocket):
    # 手动控制握手过程
    await websocket.accept()

    # 获取握手时的请求头
    headers = websocket.headers
    auth_token = headers.get("authorization")

    if not verify_jwt_token(auth_token):
        await websocket.close(code=1008, reason="Invalid credentials")
        return

    # 验证通过后的处理逻辑

2.2 验证失败处理模式

graph TD A[客户端请求连接] –> B{验证Headers} B — 成功 –> C[返回101状态码] B — 失败 –> D[返回403状态码] C –> E[建立双向通信] D –> F[关闭TCP连接]

三、连接状态维护与生命周期管理

3.1 连接状态跟踪

使用字典维护活跃连接:

from typing import Dict

active_connections: Dict[str, WebSocket] = {}


@app.websocket("/ws/status")
async def status_ws(websocket: WebSocket):
    await websocket.accept()
    client_id = str(websocket.client)
    active_connections[client_id] = websocket

    try:
        while True:
            # 保持连接活跃
            await websocket.receive_text()
    except WebSocketDisconnect:
        del active_connections[client_id]

3.2 心跳检测机制

import asyncio


async def heartbeat(websocket: WebSocket, interval: int = 30):
    try:
        while True:
            await asyncio.sleep(interval)
            await websocket.send_json({
                "type": "heartbeat",
                "timestamp": time.time()
            })
    except WebSocketDisconnect:
        print("Heartbeat terminated")


@app.websocket("/ws/realtime")
async def realtime_ws(websocket: WebSocket):
    await websocket.accept()
    # 启动独立心跳任务
    heartbeat_task = asyncio.create_task(heartbeat(websocket))

    try:
        # 主消息处理循环
        while True:
            data = await websocket.receive_json()
            # 处理业务逻辑...
    finally:
        heartbeat_task.cancel()
        await websocket.close()

课后 Quiz

  1. 当客户端发送非文本格式数据时如何处理?
# 正确处理方法
try:
    data = await websocket.receive_json()
except WebSocketDisconnect:
# 处理断开逻辑
except ValueError:
    await websocket.send_text("ERROR: 仅支持JSON格式")

# 错误处理方法:直接使用未校验的receive()
  1. 如何获取客户端的真实IP地址?
# 正确方法
client_ip = websocket.client.host

# 常见错误:直接读取X-Forwarded-For头
# 需要配合代理设置处理

常见报错解决方案

错误1:403 Handshake Failed

现象:客户端连接时立即断开
分析:未通过握手验证,常见于:

  • 缺少必要认证头
  • 验证逻辑返回False后未及时close()
    解决
# 在拒绝连接时立即关闭
if not valid:
    await websocket.close(code=1008)
    return  # 必须立即返回

错误2:1006 Abnormal Closure

现象:客户端非正常断开
处理方案

try:
    while True:
        data = await websocket.receive_text()
except WebSocketDisconnect as e:
    print(f"断开代码:{e.code}")

错误3:TypeError: ‘str’ expected

场景:发送非文本数据时
正确做法

# 发送二进制数据
await websocket.send_bytes(binary_data)

# 发送文本数据
await websocket.send_text("message")

# 发送JSON
await websocket.send_json({"key": "value"})

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何在FastAPI中玩转WebSocket,让实时通信不再烦恼?

往期文章归档:

免费好用的热门在线工具