基于 SocketIO 消息协议设计报文规范,构建FastAPI上的SocketIO 应用


最近在研究Python下整合FastAPI的Socket.IO 应用,对于其WebSocket的消息报文协议进行了深入了解,并整理了相关的协议内容,整合到FastAPI的WebSocket通讯处理中,用作多端的消息通讯,如聊天,系统信息通知等。

1. 总体设计

  • 统一事件名:客户端和服务端只监听/发送一个事件,例如:”message”。
  • 消息类型区分:通过 msgtype 字段区分不同的业务逻辑。
  • 通用字段:每条消息都必须包含的基础字段(如 msgtype, sender, timestamp)。
  • 扩展字段:放在 payload 里,根据不同类型自由定义。
  • 报文格式:统一为 JSON 格式。
  • 广播机制:服务端可以广播消息到所有客户端,方便消息的实时推送。
  • 房间机制:服务端可以创建、加入或离开房间,方便消息的分发。

2. 消息格式

{
  "msgtype": "string",        // 消息类型,如 "chat", "notification", "command"
  "sender": "string",         // 发送者标识,如用户ID或系统标识
  "timestamp": "number",      // 消息发送时间戳(毫秒)
  "payload": {                // 消息内容(不同 msgtype 有不同格式)
    // 根据 msgtype 定义不同的结构
  },

  "room": "string",          // 房间标识(可选),表示消息所属的房间
  "touser": "string",       // 接收者标识(可选),表示消息的目标用户,多个接收者用‘|’分隔
  "totag": "string",        // 接收者标签(可选),表示消息的目标用户标签,多个标签用‘|’分隔
}

例子:

{
  "msgtype": "chat",
  "room": "room1",
  "sender": "user1",
  "timestamp": 1612345678901,
  "payload": {
    "text": "Hello, world!"
  }
}

3. 消息类型

3.1 系统类消息(system)

系统类消息由系统发送,用于通知客户端或服务端状态变化。

{
  "msgtype": "system_notice",
  "sender": "system",
  "timestamp": 1694412000000,
  "payload": {
    "level": "info",   // info, warning, error
    "text": "服务器即将维护"
  }
}

3.2 聊天类消息(chat)

聊天类消息由用户发送,用于聊天。

{
  "msgtype": "chat_message",
  "room": null,
  "sender": "alice",
  "timestamp": 1694412000000,
  "payload": {
    "text": "你好,大家!",
    "extra": {
      "emoji": ""
    }
  }
}


3.3 房间操作类消息(room)

房间操作类消息用于创建、加入或离开房间。

{
  "msgtype": "room_event",
  "room": "room1",
  "sender": "bob",
  "timestamp": 1694412000000,
  "payload": {
    "action": "join",   // join, leave, create, destroy
    "user": "bob"
  }
}

3.4 ACK / 状态反馈消息(ack)

ACK / 状态反馈消息用于确认客户端或服务端收到消息。

{
  "msgtype": "ack",
  "sender": "server",
  "timestamp": 1694412000000,
  "payload": {
    "status": "ok",     // ok, fail
    "requestId": "abc123",
    "msg": "消息已送达"
  }
}

3.5 命令类消息(command)

命令类消息用于向服务端发送指令。

{
  "msgtype": "command",
  "sender": "user1",
  "timestamp": 1694412000000,
  "payload": {
    "command": "create_room",
    "args": {
      "name": "room1"
    }
  }
}

3.6 图片类消息(image)

图片类消息用于发送图片。

{
  "msgtype": "image",
  "room": "room1",
  "sender": "user1",
  "timestamp": 1694412000000,
  "payload": {
    "url": "https://example.com/image.jpg"
  }
}

3.7 文件类消息(file)

文件类消息用于发送文件。

{
  "msgtype": "file",
  "room": "room1",
  "sender": "user1",
  "timestamp": 1694412000000,
  "payload": {
    "url": "https://example.com/file.pdf"
  }
}   

3.8 语音类消息(voice)

3.9 视频类消息(video)

3.10 位置类消息(location)

3.11 其他类型消息(other)

3.12 自定义消息(custom)

4、Python 服务端示例

import time
import socketio

sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*")

def make_message(msg_type, sender, payload, room=None):
    return {
        "msgtype": msg_type,
        "room": room,
        "sender": sender,
        "timestamp": int(time.time() * 1000),
        "payload": payload
    }

@sio.event
async def chat_message(sid, data):
    msg = make_message("chat_message", sid, {"text": data})
    await sio.emit("message", msg)  # 广播

@sio.event
async def join_room(sid, room):
    sio.enter_room(sid, room)
    msg = make_message("room_event", "system", {"action": "join", "user": sid}, room)
    await sio.emit("message", msg, room=room)

5、Vue3 前端示例

socket.on("message", (msg) => {
  switch (msg.msgtype) {
    case "chat_message":
      console.log(`[${msg.sender}] 说: ${msg.payload.text}`);
      break;

    case "system_notice":
      console.log(`系统通知: ${msg.payload.text}`);
      break;

    case "room_event":
      console.log(`房间 ${msg.room} 事件: ${msg.payload.action} -> ${msg.payload.user}`);
      break;

    case "ack":
      console.log(`ACK: ${msg.payload.status} (${msg.payload.msg})`);
      break;

    default:
      console.warn("未知消息类型:", msg);
  }
});

6、通信流程示例

6.1 普通消息

  • 客户端发送:
{
  "msgtype": "chat_message",
  "sender": "alice",
  "timestamp": 1694412000000,
  "payload": {
    "text": "Hello World"
  }
}

  • 服务端广播:
{
  "msgtype": "chat_message",
  "sender": "alice",
  "timestamp": 1694412000500,
  "payload": {
    "text": "Hello World"
  }
}

6.2 房间消息

6.2.1 创建房间

  1. 客户端发送:
{
  "msgtype": "command",
  "sender": "bob",
  "timestamp": 1694412000000,
  "payload": {
    "command": "create_room",
    "args": {
      "name": "room1"
    }
  }
}

2. 服务端响应:

```json
{
  "msgtype": "ack",
  "sender": "server",
  "timestamp": 1694412000000,
  "payload": {
    "status": "ok",
    "requestId": "abc123",
    "msg": "房间 room1 创建成功"
  }
}

6.2.2 加入房间

1.客户端发送:

{
  "msgtype": "room_event",
  "sender": "alice",
  "timestamp": 1694412000000,
  "payload": {
    "action": "join",
    "user": "alice"
  }
}

2.服务端响应:

{
  "msgtype": "ack",
  "sender": "server",
  "timestamp": 1694412000000,
  "payload": {
    "status": "ok",
    "requestId": "abc123",
    "msg": "用户 alice 加入房间 room1"
  }
}

3.服务端广播:

{
  "msgtype": "room_event",
  "room": "room1",
  "sender": "server",
  "timestamp": 1694412000000,
  "payload": {
    "action": "join",
    "user": "alice"
  }
}

6.2.3 离开房间

1.客户端发送:

{
  "msgtype": "room_event",
  "sender": "alice",
  "timestamp": 1694412000000,
  "payload": {
    "action": "leave",
    "user": "alice"
  }
}

2.服务端响应:

{
  "msgtype": "ack",
  "sender": "server",
  "timestamp": 1694412000000,
  "payload": {
    "status": "ok",
    "requestId": "abc123",
    "msg": "用户 alice 离开房间 room1"
  }
}

7、优点

  • 统一事件名:客户端和服务端只监听/发送一个事件,降低耦合度,提高代码可读性。

  • 消息类型区分:通过 msgtype 字段区分不同的业务逻辑,提高消息处理效率。

  • 通用字段:每条消息都必须包含的基础字段(如 msgtype, sender, timestamp),方便服务端处理。

  • 扩展字段:放在 payload 里,根据不同类型自由定义,方便消息内容的扩展。

  • 状态反馈消息:ACK / 状态反馈消息用于确认客户端或服务端收到消息,方便客户端处理。

  • 命令类消息:命令类消息用于向服务端发送指令,方便服务端处理。

  • 广播机制:服务端可以广播消息到所有客户端,方便消息的实时推送。

  • 房间机制:服务端可以创建、加入或离开房间,方便消息的分发。

  • 扩展性:服务端和客户端可以自由扩展消息类型,增加更多的业务逻辑。

  • 兼容性:服务端和客户端可以兼容 SocketIO 协议,方便与其他服务端和客户端通信。

  • 易用性:服务端和客户端可以直接使用 SocketIO 库,降低学习成本。

  • 性能:服务端和客户端可以利用 WebSocket 协议,提高消息传输效率。

8、关于 Socket.IO的挂载的注意事项

Socket.IO 是一个基于 WebSocket 的实时通信协议,它可以实现浏览器和服务器之间的双向通信。
Socket.IO 协议的实现主要依赖于两个部分:

  • Socket.IO 服务器:Socket.IO 服务器负责维护连接,处理消息,并向客户端推送消息。
  • Socket.IO 客户端:Socket.IO 客户端负责维护连接,向 Socket.IO 服务器发送消息,并接收消息。

如果你用 app = register_app(),然后用 uvicorn main:app 或者 config = uvicorn.Config(app, …),这样启动的“app”是原生 FastAPI 实例,不包含 Socket.IO 的功能,所以前端 Socket.IO 客户端连接时不会触发 @sio.event 的回调!

解决方法:
你需要让 Uvicorn 启动的是 ASGI 的组合应用,即 sio_app,而不是纯 FastAPI 的 app。


import socketio
from fastapi import FastAPI

def register_app():
    app = FastAPI()
    # ...注册路由等...
    return app

app = register_app()
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
sio_app = socketio.ASGIApp(sio, other_asgi_app=app)

# main.py 入口
if __name__ == "__main__":
    import uvicorn
    # 下面两种写法都可以
    uvicorn.run(sio_app, host="0.0.0.0", port=8000)
    # 或
    # config = uvicorn.Config(sio_app, host="0.0.0.0", port=8000)
    # server = uvicorn.Server(config)
    # server.run()

注意:一定要启动的是 sio_app,不是 app!
你可以继续用 register_app() 组织你的 FastAPI 配置,但最终给 Uvicorn 的必须是 sio_app。

最后我们来看看再FastAPI项目上对接的例子通讯截图(尚未完工,持续改进优化中)。

服务端的信息截图

客户端信息截图