实时通信的痛点
传统REST API是”你问我答”模式,但很多场景需要”服务器主动推”——聊天消息、股票行情、协同编辑、游戏同步。WebSocket是标准方案,但手写WebSocket服务要考虑:握手、心跳、断线重连、消息顺序、多端同步……
这篇文章,用MonkeyCode生成一个完整的WebSocket服务,包含前后端和断线重连机制。
需求描述(给MonkeyCode的prompt)
用Node.js + WebSocket实现一个实时聊天室,要求:
后端(Node.js + ws库):
1. WebSocket握手 + JWT认证
2. 心跳检测(ping/pong)
3. 消息广播(房间机制)
4. 消息持久化(内存存储即可)
5. 断线检测 + 自动清理
6. 速率限制(防刷屏)
7. 错误处理 + 优雅关闭
前端(原生JavaScript):
1. WebSocket连接 + JWT认证
2. 断线自动重连(指数退避)
3. 心跳维护
4. 消息队列(离线时缓存)
5. 连接状态UI提示
6. 消息历史加载
技术栈:Node.js + ws + JWT,前端不用框架
输出:完整可运行的代码,包含详细注释
MonkeyCode生成的WebSocket服务器
// server.js - MonkeyCode生成
const WebSocket = require('ws');
const http = require('http');
const jwt = require('jsonwebtoken');
const url = require('url');
const PORT = process.env.PORT || 8080;
const JWT_SECRET = process.env.JWT_SECRET || 'your-secret-key';
// 创建HTTP服务器(用于健康检查和非WS请求)
const server = http.createServer((req, res) => {
if (req.url === '/health') {
res.writeHead(200);
res.end(JSON.stringify({ status: 'ok', connections: wss.clients?.size || 0 }));
} else {
res.writeHead(404);
res.end('Not Found');
}
});
// WebSocket服务器
const wss = new WebSocket.Server({ server });
// 存储
const rooms = new Map(); // roomId -> Set<WebSocket>
const userSockets = new Map(); // userId -> Set<WebSocket>
const messageHistory = new Map(); // roomId -> [{msg}]
// 速率限制
const rateLimits = new Map(); // userId -> [timestamp, ...]
function checkRateLimit(userId, maxMsgs = 10, windowMs = 10000) {
const now = Date.now();
const userMsgs = rateLimits.get(userId) || [];
// 清除窗口外的消息
const validMsgs = userMsgs.filter(t => now - t < windowMs);
if (validMsgs.length >= maxMsgs) return false;
validMsgs.push(now);
rateLimits.set(userId, validMsgs);
return true;
}
// 认证中间件
function authenticate(ws, req) {
const params = url.parse(req.url, true).query;
const token = params.token;
if (!token) {
ws.close(1008, 'Authentication required');
return null;
}
try {
const decoded = jwt.verify(token, JWT_SECRET);
return decoded; // { userId, username }
} catch (err) {
ws.close(1008, 'Invalid token');
return null;
}
}
// 心跳检测
function heartbeat() {
this.isAlive = true;
}
// 广播消息到房间
function broadcastToRoom(roomId, message, excludeWs = null) {
const room = rooms.get(roomId);
if (!room) return;
const msgStr = JSON.stringify(message);
room.forEach(ws => {
if (ws !== excludeWs && ws.readyState === WebSocket.OPEN) {
ws.send(msgStr);
}
});
}
// 保存消息历史
function saveMessage(roomId, message) {
if (!messageHistory.has(roomId)) {
messageHistory.set(roomId, []);
}
const history = messageHistory.get(roomId);
history.push(message);
// 只保留最近100条
if (history.length > 100) {
messageHistory.set(roomId, history.slice(-100));
}
}
wss.on('connection', (ws, req) => {
const user = authenticate(ws, req);
if (!user) return;
console.log(`User connected: ${user.username} (${user.userId})`);
// 初始化
ws.isAlive = true;
ws.user = user;
ws.rooms = new Set();
// 加入用户socket映射
if (!userSockets.has(user.userId)) {
userSockets.set(user.userId, new Set());
}
userSockets.get(user.userId).add(ws);
// 心跳
ws.on('pong', heartbeat);
// 处理消息
ws.on('message', (rawMessage) => {
let message;
try {
message = JSON.parse(rawMessage);
} catch {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
return;
}
// 速率检查
if (!checkRateLimit(user.userId)) {
ws.send(JSON.stringify({ type: 'error', message: 'Rate limit exceeded' }));
return;
}
switch (message.type) {
case 'join':
handleJoinRoom(ws, message);
break;
case 'leave':
handleLeaveRoom(ws, message);
break;
case 'chat':
handleChatMessage(ws, message);
break;
case 'typing':
handleTyping(ws, message);
break;
default:
ws.send(JSON.stringify({ type: 'error', message: `Unknown message type: ${message.type}` }));
}
});
// 连接关闭
ws.on('close', () => {
console.log(`User disconnected: ${user.username}`);
// 从所有房间移除
ws.rooms.forEach(roomId => {
const room = rooms.get(roomId);
if (room) {
room.delete(ws);
broadcastToRoom(roomId, {
type: 'system',
message: `${user.username} left the room`,
timestamp: Date.now()
}, ws);
}
});
// 从用户socket映射移除
const userWsSet = userSockets.get(user.userId);
if (userWsSet) {
userWsSet.delete(ws);
if (userWsSet.size === 0) {
userSockets.delete(user.userId);
}
}
});
// 错误处理
ws.on('error', (err) => {
console.error(`WebSocket error for ${user.username}:`, err.message);
});
// 发送历史消息
ws.send(JSON.stringify({
type: 'history',
rooms: Array.from(rooms.keys()).map(roomId => ({
id: roomId,
members: rooms.get(roomId)?.size || 0,
history: messageHistory.get(roomId)?.slice(-50) || []
}))
}));
});
// 处理加入房间
function handleJoinRoom(ws, message) {
const { roomId, roomName } = message;
if (!roomId) {
ws.send(JSON.stringify({ type: 'error', message: 'roomId required' }));
return;
}
// 创建房间(如果不存在)
if (!rooms.has(roomId)) {
rooms.set(roomId, new Set());
}
const room = rooms.get(roomId);
room.add(ws);
ws.rooms.add(roomId);
// 发送历史消息
const history = messageHistory.get(roomId) || [];
ws.send(JSON.stringify({
type: 'room_history',
roomId,
history: history.slice(-50)
}));
// 广播用户加入
broadcastToRoom(roomId, {
type: 'system',
message: `${ws.user.username} joined the room`,
timestamp: Date.now()
}, ws);
ws.send(JSON.stringify({
type: 'joined',
roomId,
roomName: roomName || roomId,
members: room.size
}));
}
// 处理离开房间
function handleLeaveRoom(ws, message) {
const { roomId } = message;
if (!roomId || !rooms.has(roomId)) return;
const room = rooms.get(roomId);
room.delete(ws);
ws.rooms.delete(roomId);
broadcastToRoom(roomId, {
type: 'system',
message: `${ws.user.username} left the room`,
timestamp: Date.now()
}, ws);
}
// 处理聊天消息
function handleChatMessage(ws, message) {
const { roomId, content, id } = message;
if (!roomId || !content) {
ws.send(JSON.stringify({ type: 'error', message: 'roomId and content required' }));
return;
}
if (!rooms.has(roomId)) {
ws.send(JSON.stringify({ type: 'error', message: 'Room not found' }));
return;
}
const chatMessage = {
id: id || `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
type: 'chat',
roomId,
userId: ws.user.userId,
username: ws.user.username,
content,
timestamp: Date.now()
};
// 保存历史
saveMessage(roomId, chatMessage);
// 广播
broadcastToRoom(roomId, chatMessage, null);
}
// 处理输入状态
function handleTyping(ws, message) {
const { roomId, isTyping } = message;
if (!roomId) return;
broadcastToRoom(roomId, {
type: 'typing',
userId: ws.user.userId,
username: ws.user.username,
isTyping
}, ws);
}
// 心跳检测:每隔30秒检查一次
const heartbeatInterval = setInterval(() => {
wss.clients.forEach(ws => {
if (ws.isAlive === false) {
console.log(`Terminating dead connection: ${ws.user?.username}`);
return ws.terminate();
}
ws.isAlive = false;
ws.ping(() => {});
});
}, 30000);
// 优雅关闭
function gracefulShutdown() {
console.log('Shutting down gracefully...');
clearInterval(heartbeatInterval);
// 通知所有客户端
wss.clients.forEach(ws => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'server_shutdown' }));
ws.close(1001, 'Server shutting down');
}
});
// 等待连接关闭
setTimeout(() => {
wss.close(() => {
server.close(() => {
process.exit(0);
});
});
}, 1000);
}
process.on('SIGINT', gracefulShutdown);
process.on('SIGTERM', gracefulShutdown);
server.listen(PORT, () => {
console.log(`WebSocket server running on port ${PORT}`);
console.log(`Health check: http://localhost:${PORT}/health`);
});
MonkeyCode生成的前端WebSocket客户端
<!-- index.html + client.js - MonkeyCode生成 -->
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>实时聊天室</title>
<style>
body { font-family: sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
#status { padding: 8px; border-radius: 4px; margin-bottom: 10px; }
.connected { background: #d4edda; color: #155724; }
.disconnected { background: #f8d7da; color: #721c24; }
#messages { height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px; margin-bottom: 10px; }
.message { margin: 5px 0; }
.system { color: #666; font-style: italic; }
.my-message { text-align: right; }
.typing-indicator { color: #999; font-size: 0.9em; }
</style>
</head>
<body>
<div id="status" class="disconnected">未连接</div>
<div>
<input id="token" placeholder="JWT Token" style="width: 300px;">
<button onclick="connect()">连接</button>
<button onclick="disconnect()">断开</button>
</div>
<div style="margin: 10px 0;">
<input id="roomId" placeholder="房间ID" value="general">
<button onclick="joinRoom()">加入房间</button>
</div>
<div id="messages"></div>
<div>
<input id="msgInput" placeholder="输入消息..." style="width: 300px;" onkeypress="if(event.key==='Enter')sendMessage()">
<button onclick="sendMessage()">发送</button>
</div>
<div id="typing" class="typing-indicator"></div>
<script>
let ws = null;
let currentRoom = null;
let reconnectAttempts = 0;
let reconnectTimer = null;
let messageQueue = []; // 离线时缓存的消息
const maxReconnectDelay = 30000;
// 指数退避重连
function scheduleReconnect() {
if (reconnectAttempts >= 10) {
addMessage('system', '重连失败,请手动刷新页面');
return;
}
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), maxReconnectDelay);
addMessage('system', `连接断开,${Math.round(delay/1000)}秒后重试...`);
reconnectTimer = setTimeout(() => {
reconnectAttempts++;
connect();
}, delay);
}
function connect() {
const token = document.getElementById('token').value;
if (!token) { alert('请输入JWT Token'); return; }
if (reconnectTimer) clearTimeout(reconnectTimer);
ws = new WebSocket(`ws://localhost:8080?token=${encodeURIComponent(token)}`);
ws.onopen = () => {
console.log('WebSocket connected');
reconnectAttempts = 0;
updateStatus(true);
// 发送离线时缓存的消息
while (messageQueue.length > 0) {
const msg = messageQueue.shift();
ws.send(JSON.stringify(msg));
}
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
handleMessage(message);
};
ws.onclose = (event) => {
console.log('WebSocket disconnected:', event.code, event.reason);
updateStatus(false);
if (event.code !== 1000 && event.code !== 1001) {
// 非正常关闭,尝试重连
scheduleReconnect();
}
};
ws.onerror = (err) => {
console.error('WebSocket error:', err);
};
}
function disconnect() {
if (ws) {
ws.close(1000, 'User disconnected');
ws = null;
}
}
function updateStatus(connected) {
const statusEl = document.getElementById('status');
statusEl.className = connected ? 'connected' : 'disconnected';
statusEl.textContent = connected ? '已连接' : '未连接';
}
function handleMessage(message) {
switch (message.type) {
case 'chat':
addMessage('chat', message.username, message.content, message.timestamp, message.userId === getCurrentUserId());
break;
case 'system':
addMessage('system', message.message);
break;
case 'history':
// 加载历史消息
message.rooms.forEach(room => {
room.history.forEach(msg => addMessage('chat', msg.username, msg.content, msg.timestamp));
});
break;
case 'room_history':
message.history.forEach(msg => addMessage('chat', msg.username, msg.content, msg.timestamp));
break;
case 'joined':
currentRoom = message.roomId;
addMessage('system', `已加入房间: ${message.roomName || message.roomId}`);
break;
case 'typing':
showTyping(message.username, message.isTyping);
break;
case 'error':
addMessage('system', `错误: ${message.message}`);
break;
}
}
function sendMessage() {
const input = document.getElementById('msgInput');
const content = input.value.trim();
if (!content || !ws || ws.readyState !== WebSocket.OPEN) {
if (content && ws && ws.readyState !== WebSocket.OPEN) {
// 离线时缓存消息
messageQueue.push({
type: 'chat',
roomId: currentRoom,
content,
timestamp: Date.now()
});
addMessage('system', '离线模式:消息已缓存,连接恢复后发送');
}
return;
}
const message = {
type: 'chat',
roomId: currentRoom,
content,
timestamp: Date.now(),
id: generateId()
};
ws.send(JSON.stringify(message));
input.value = '';
}
function joinRoom() {
const roomId = document.getElementById('roomId').value.trim();
if (!roomId || !ws || ws.readyState !== WebSocket.OPEN) {
alert('请先连接WebSocket');
return;
}
ws.send(JSON.stringify({ type: 'join', roomId }));
}
function addMessage(type, author, content, timestamp, isMine = false) {
const messagesEl = document.getElementById('messages');
const msgEl = document.createElement('div');
msgEl.className = `message ${type} ${isMine ? 'my-message' : ''}`;
if (type === 'system') {
msgEl.textContent = author; // author is actually the system message
} else {
const time = timestamp ? new Date(timestamp).toLocaleTimeString() : '';
msgEl.innerHTML = `<strong>${escapeHtml(author)}</strong> [${time}]: ${escapeHtml(content)}`;
}
messagesEl.appendChild(msgEl);
messagesEl.scrollTop = messagesEl.scrollHeight;
}
function showTyping(username, isTyping) {
const typingEl = document.getElementById('typing');
if (isTyping) {
typingEl.textContent = `${username} 正在输入...`;
} else {
typingEl.textContent = '';
}
}
// 输入状态通知(防抖)
let typingTimer = null;
document.getElementById('msgInput').addEventListener('input', () => {
if (!ws || ws.readyState !== WebSocket.OPEN || !currentRoom) return;
ws.send(JSON.stringify({ type: 'typing', roomId: currentRoom, isTyping: true }));
clearTimeout(typingTimer);
typingTimer = setTimeout(() => {
ws.send(JSON.stringify({ type: 'typing', roomId: currentRoom, isTyping: false }));
}, 1000);
});
function generateId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
function getCurrentUserId() {
// 从JWT解析(简化版)
const token = document.getElementById('token').value;
try {
const payload = JSON.parse(atob(token.split('.')[1]));
return payload.userId;
} catch {
return null;
}
}
function escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
</script>
</body>
</html>
关键技术点解析
1. 心跳机制(Heartbeat)
WebSocket连接可能因为网络问题静默断开(”半开连接”)。解决方法:
// 服务端:每30秒ping一次
setInterval(() => {
wss.clients.forEach(ws => {
if (ws.isAlive === false) return ws.terminate();
ws.isAlive = false;
ws.ping(() => {});
});
}, 30000);
// 客户端:响应pong
ws.on('pong', () => { ws.isAlive = true; });
2. 指数退避重连
网络抖动时,如果立刻重连会雪崩。正确做法:
let reconnectAttempts = 0;
function scheduleReconnect() {
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000);
setTimeout(() => {
reconnectAttempts++;
connect();
}, delay);
}
退避序列:1秒 → 2秒 → 4秒 → 8秒 → … → 最长30秒。
3. 消息队列(离线缓存)
用户网络断开时,发送的消息不能丢失:
let messageQueue = [];
function sendMessage(content) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'chat', content }));
} else {
// 离线:缓存
messageQueue.push({ type: 'chat', content });
}
}
// 重连成功后发送缓存
ws.onopen = () => {
while (messageQueue.length > 0) {
ws.send(JSON.stringify(messageQueue.shift()));
}
};
4. 速率限制(防刷屏)
const rateLimits = new Map();
function checkRateLimit(userId) {
const now = Date.now();
const userMsgs = (rateLimits.get(userId) || [])
.filter(t => now - t < 10000); // 10秒窗口
if (userMsgs.length >= 10) return false; // 10秒内最多10条
userMsgs.push(now);
rateLimits.set(userId, userMsgs);
return true;
}
运行和测试
# 安装依赖
npm install ws jsonwebtoken
# 启动服务器
JWT_SECRET=mysecret PORT=8080 node server.js
# 生成测试JWT(另一个终端)
node -e "const jwt = require('jsonwebtoken'); console.log(jwt.sign({userId:'u1',username:'test'}, 'mysecret', {expiresIn:'1h'}))"
# 复制输出的token,粘贴到前端页面的token输入框,点击"连接"
用两个浏览器标签打开index.html,分别用不同用户登录,就能实时聊天了。
用MonkeyCode扩展功能
当前实现是基础版,可以继续让MonkeyCode添加:
- 私聊功能:
/msg @username 内容 - 消息回执:已送达/已读状态
- 房间权限:密码房间、管理员
- 消息搜索:在服务器端实现全文搜索
- 文件传输:用WebSocket发送二进制数据
只需要在初始prompt里加上这些需求,MonkeyCode会生成对应的代码。
WebSocket看起来复杂,但用MonkeyCode拆解成”认证→心跳→房间→消息→重连”几个模块后,每个部分都是几十行代码。关键是需求描述要结构化,不要说”帮我写个聊天室”,而要说”帮我实现WebSocket服务器,包含认证、心跳、房间、速率限制”。
文章摘自:https://www.cnblogs.com/jaryn/p/20218876
