MonkeyCode搞定WebSocket实时通信:从握手到断线重连的完整实现

实时通信的痛点

传统REST API是”你问我答”模式,但很多场景需要”服务器主动推”——聊天消息、股票行情、协同编辑、游戏同步。WebSocket是标准方案,但手写WebSocket服务要考虑:握手、心跳、断线重连、消息顺序、多端同步……

这篇文章,用MonkeyCode生成一个完整的WebSocket服务,包含前后端和断线重连机制。

需求描述(给MonkeyCode的prompt)

用Node.js + WebSocket实现一个实时聊天室,要求:

后端(Node.js + ws库):
1. WebSocket握手 + JWT认证
2. 心跳检测(ping/pong)
3. 消息广播(房间机制)
4. 消息持久化(内存存储即可)
5. 断线检测 + 自动清理
6. 速率限制(防刷屏)
7. 错误处理 + 优雅关闭

前端(原生JavaScript):
1. WebSocket连接 + JWT认证
2. 断线自动重连(指数退避)
3. 心跳维护
4. 消息队列(离线时缓存)
5. 连接状态UI提示
6. 消息历史加载

技术栈:Node.js + ws + JWT,前端不用框架
输出:完整可运行的代码,包含详细注释

MonkeyCode生成的WebSocket服务器

// server.js - MonkeyCode生成
const WebSocket = require('ws');
const http = require('http');
const jwt = require('jsonwebtoken');
const url = require('url');

const PORT = process.env.PORT || 8080;
const JWT_SECRET = process.env.JWT_SECRET || 'your-secret-key';

// 创建HTTP服务器(用于健康检查和非WS请求)
const server = http.createServer((req, res) => {
  if (req.url === '/health') {
    res.writeHead(200);
    res.end(JSON.stringify({ status: 'ok', connections: wss.clients?.size || 0 }));
  } else {
    res.writeHead(404);
    res.end('Not Found');
  }
});

// WebSocket服务器
const wss = new WebSocket.Server({ server });

// 存储
const rooms = new Map(); // roomId -> Set<WebSocket>
const userSockets = new Map(); // userId -> Set<WebSocket>
const messageHistory = new Map(); // roomId -> [{msg}]

// 速率限制
const rateLimits = new Map(); // userId -> [timestamp, ...]

function checkRateLimit(userId, maxMsgs = 10, windowMs = 10000) {
  const now = Date.now();
  const userMsgs = rateLimits.get(userId) || [];
  
  // 清除窗口外的消息
  const validMsgs = userMsgs.filter(t => now - t < windowMs);
  
  if (validMsgs.length >= maxMsgs) return false;
  
  validMsgs.push(now);
  rateLimits.set(userId, validMsgs);
  return true;
}

// 认证中间件
function authenticate(ws, req) {
  const params = url.parse(req.url, true).query;
  const token = params.token;
  
  if (!token) {
    ws.close(1008, 'Authentication required');
    return null;
  }
  
  try {
    const decoded = jwt.verify(token, JWT_SECRET);
    return decoded; // { userId, username }
  } catch (err) {
    ws.close(1008, 'Invalid token');
    return null;
  }
}

// 心跳检测
function heartbeat() {
  this.isAlive = true;
}

// 广播消息到房间
function broadcastToRoom(roomId, message, excludeWs = null) {
  const room = rooms.get(roomId);
  if (!room) return;
  
  const msgStr = JSON.stringify(message);
  room.forEach(ws => {
    if (ws !== excludeWs && ws.readyState === WebSocket.OPEN) {
      ws.send(msgStr);
    }
  });
}

// 保存消息历史
function saveMessage(roomId, message) {
  if (!messageHistory.has(roomId)) {
    messageHistory.set(roomId, []);
  }
  const history = messageHistory.get(roomId);
  history.push(message);
  
  // 只保留最近100条
  if (history.length > 100) {
    messageHistory.set(roomId, history.slice(-100));
  }
}

wss.on('connection', (ws, req) => {
  const user = authenticate(ws, req);
  if (!user) return;
  
  console.log(`User connected: ${user.username} (${user.userId})`);
  
  // 初始化
  ws.isAlive = true;
  ws.user = user;
  ws.rooms = new Set();
  
  // 加入用户socket映射
  if (!userSockets.has(user.userId)) {
    userSockets.set(user.userId, new Set());
  }
  userSockets.get(user.userId).add(ws);
  
  // 心跳
  ws.on('pong', heartbeat);
  
  // 处理消息
  ws.on('message', (rawMessage) => {
    let message;
    try {
      message = JSON.parse(rawMessage);
    } catch {
      ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
      return;
    }
    
    // 速率检查
    if (!checkRateLimit(user.userId)) {
      ws.send(JSON.stringify({ type: 'error', message: 'Rate limit exceeded' }));
      return;
    }
    
    switch (message.type) {
      case 'join':
        handleJoinRoom(ws, message);
        break;
      case 'leave':
        handleLeaveRoom(ws, message);
        break;
      case 'chat':
        handleChatMessage(ws, message);
        break;
      case 'typing':
        handleTyping(ws, message);
        break;
      default:
        ws.send(JSON.stringify({ type: 'error', message: `Unknown message type: ${message.type}` }));
    }
  });
  
  // 连接关闭
  ws.on('close', () => {
    console.log(`User disconnected: ${user.username}`);
    // 从所有房间移除
    ws.rooms.forEach(roomId => {
      const room = rooms.get(roomId);
      if (room) {
        room.delete(ws);
        broadcastToRoom(roomId, {
          type: 'system',
          message: `${user.username} left the room`,
          timestamp: Date.now()
        }, ws);
      }
    });
    // 从用户socket映射移除
    const userWsSet = userSockets.get(user.userId);
    if (userWsSet) {
      userWsSet.delete(ws);
      if (userWsSet.size === 0) {
        userSockets.delete(user.userId);
      }
    }
  });
  
  // 错误处理
  ws.on('error', (err) => {
    console.error(`WebSocket error for ${user.username}:`, err.message);
  });
  
  // 发送历史消息
  ws.send(JSON.stringify({
    type: 'history',
    rooms: Array.from(rooms.keys()).map(roomId => ({
      id: roomId,
      members: rooms.get(roomId)?.size || 0,
      history: messageHistory.get(roomId)?.slice(-50) || []
    }))
  }));
});

// 处理加入房间
function handleJoinRoom(ws, message) {
  const { roomId, roomName } = message;
  
  if (!roomId) {
    ws.send(JSON.stringify({ type: 'error', message: 'roomId required' }));
    return;
  }
  
  // 创建房间(如果不存在)
  if (!rooms.has(roomId)) {
    rooms.set(roomId, new Set());
  }
  
  const room = rooms.get(roomId);
  room.add(ws);
  ws.rooms.add(roomId);
  
  // 发送历史消息
  const history = messageHistory.get(roomId) || [];
  ws.send(JSON.stringify({
    type: 'room_history',
    roomId,
    history: history.slice(-50)
  }));
  
  // 广播用户加入
  broadcastToRoom(roomId, {
    type: 'system',
    message: `${ws.user.username} joined the room`,
    timestamp: Date.now()
  }, ws);
  
  ws.send(JSON.stringify({
    type: 'joined',
    roomId,
    roomName: roomName || roomId,
    members: room.size
  }));
}

// 处理离开房间
function handleLeaveRoom(ws, message) {
  const { roomId } = message;
  if (!roomId || !rooms.has(roomId)) return;
  
  const room = rooms.get(roomId);
  room.delete(ws);
  ws.rooms.delete(roomId);
  
  broadcastToRoom(roomId, {
    type: 'system',
    message: `${ws.user.username} left the room`,
    timestamp: Date.now()
  }, ws);
}

// 处理聊天消息
function handleChatMessage(ws, message) {
  const { roomId, content, id } = message;
  
  if (!roomId || !content) {
    ws.send(JSON.stringify({ type: 'error', message: 'roomId and content required' }));
    return;
  }
  
  if (!rooms.has(roomId)) {
    ws.send(JSON.stringify({ type: 'error', message: 'Room not found' }));
    return;
  }
  
  const chatMessage = {
    id: id || `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
    type: 'chat',
    roomId,
    userId: ws.user.userId,
    username: ws.user.username,
    content,
    timestamp: Date.now()
  };
  
  // 保存历史
  saveMessage(roomId, chatMessage);
  
  // 广播
  broadcastToRoom(roomId, chatMessage, null);
}

// 处理输入状态
function handleTyping(ws, message) {
  const { roomId, isTyping } = message;
  if (!roomId) return;
  
  broadcastToRoom(roomId, {
    type: 'typing',
    userId: ws.user.userId,
    username: ws.user.username,
    isTyping
  }, ws);
}

// 心跳检测:每隔30秒检查一次
const heartbeatInterval = setInterval(() => {
  wss.clients.forEach(ws => {
    if (ws.isAlive === false) {
      console.log(`Terminating dead connection: ${ws.user?.username}`);
      return ws.terminate();
    }
    ws.isAlive = false;
    ws.ping(() => {});
  });
}, 30000);

// 优雅关闭
function gracefulShutdown() {
  console.log('Shutting down gracefully...');
  
  clearInterval(heartbeatInterval);
  
  // 通知所有客户端
  wss.clients.forEach(ws => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify({ type: 'server_shutdown' }));
      ws.close(1001, 'Server shutting down');
    }
  });
  
  // 等待连接关闭
  setTimeout(() => {
    wss.close(() => {
      server.close(() => {
        process.exit(0);
      });
    });
  }, 1000);
}

process.on('SIGINT', gracefulShutdown);
process.on('SIGTERM', gracefulShutdown);

server.listen(PORT, () => {
  console.log(`WebSocket server running on port ${PORT}`);
  console.log(`Health check: http://localhost:${PORT}/health`);
});

MonkeyCode生成的前端WebSocket客户端

<!-- index.html + client.js - MonkeyCode生成 -->
<!DOCTYPE html>
<html>
<head>
  <meta charset="UTF-8">
  <title>实时聊天室</title>
  <style>
    body { font-family: sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
    #status { padding: 8px; border-radius: 4px; margin-bottom: 10px; }
    .connected { background: #d4edda; color: #155724; }
    .disconnected { background: #f8d7da; color: #721c24; }
    #messages { height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px; margin-bottom: 10px; }
    .message { margin: 5px 0; }
    .system { color: #666; font-style: italic; }
    .my-message { text-align: right; }
    .typing-indicator { color: #999; font-size: 0.9em; }
  </style>
</head>
<body>
  <div id="status" class="disconnected">未连接</div>
  <div>
    <input id="token" placeholder="JWT Token" style="width: 300px;">
    <button onclick="connect()">连接</button>
    <button onclick="disconnect()">断开</button>
  </div>
  <div style="margin: 10px 0;">
    <input id="roomId" placeholder="房间ID" value="general">
    <button onclick="joinRoom()">加入房间</button>
  </div>
  <div id="messages"></div>
  <div>
    <input id="msgInput" placeholder="输入消息..." style="width: 300px;" onkeypress="if(event.key==='Enter')sendMessage()">
    <button onclick="sendMessage()">发送</button>
  </div>
  <div id="typing" class="typing-indicator"></div>

  <script>
    let ws = null;
    let currentRoom = null;
    let reconnectAttempts = 0;
    let reconnectTimer = null;
    let messageQueue = []; // 离线时缓存的消息
    const maxReconnectDelay = 30000;
    
    // 指数退避重连
    function scheduleReconnect() {
      if (reconnectAttempts >= 10) {
        addMessage('system', '重连失败,请手动刷新页面');
        return;
      }
      
      const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), maxReconnectDelay);
      addMessage('system', `连接断开,${Math.round(delay/1000)}秒后重试...`);
      
      reconnectTimer = setTimeout(() => {
        reconnectAttempts++;
        connect();
      }, delay);
    }
    
    function connect() {
      const token = document.getElementById('token').value;
      if (!token) { alert('请输入JWT Token'); return; }
      
      if (reconnectTimer) clearTimeout(reconnectTimer);
      
      ws = new WebSocket(`ws://localhost:8080?token=${encodeURIComponent(token)}`);
      
      ws.onopen = () => {
        console.log('WebSocket connected');
        reconnectAttempts = 0;
        updateStatus(true);
        
        // 发送离线时缓存的消息
        while (messageQueue.length > 0) {
          const msg = messageQueue.shift();
          ws.send(JSON.stringify(msg));
        }
      };
      
      ws.onmessage = (event) => {
        const message = JSON.parse(event.data);
        handleMessage(message);
      };
      
      ws.onclose = (event) => {
        console.log('WebSocket disconnected:', event.code, event.reason);
        updateStatus(false);
        
        if (event.code !== 1000 && event.code !== 1001) {
          // 非正常关闭,尝试重连
          scheduleReconnect();
        }
      };
      
      ws.onerror = (err) => {
        console.error('WebSocket error:', err);
      };
    }
    
    function disconnect() {
      if (ws) {
        ws.close(1000, 'User disconnected');
        ws = null;
      }
    }
    
    function updateStatus(connected) {
      const statusEl = document.getElementById('status');
      statusEl.className = connected ? 'connected' : 'disconnected';
      statusEl.textContent = connected ? '已连接' : '未连接';
    }
    
    function handleMessage(message) {
      switch (message.type) {
        case 'chat':
          addMessage('chat', message.username, message.content, message.timestamp, message.userId === getCurrentUserId());
          break;
        case 'system':
          addMessage('system', message.message);
          break;
        case 'history':
          // 加载历史消息
          message.rooms.forEach(room => {
            room.history.forEach(msg => addMessage('chat', msg.username, msg.content, msg.timestamp));
          });
          break;
        case 'room_history':
          message.history.forEach(msg => addMessage('chat', msg.username, msg.content, msg.timestamp));
          break;
        case 'joined':
          currentRoom = message.roomId;
          addMessage('system', `已加入房间: ${message.roomName || message.roomId}`);
          break;
        case 'typing':
          showTyping(message.username, message.isTyping);
          break;
        case 'error':
          addMessage('system', `错误: ${message.message}`);
          break;
      }
    }
    
    function sendMessage() {
      const input = document.getElementById('msgInput');
      const content = input.value.trim();
      if (!content || !ws || ws.readyState !== WebSocket.OPEN) {
        if (content && ws && ws.readyState !== WebSocket.OPEN) {
          // 离线时缓存消息
          messageQueue.push({
            type: 'chat',
            roomId: currentRoom,
            content,
            timestamp: Date.now()
          });
          addMessage('system', '离线模式:消息已缓存,连接恢复后发送');
        }
        return;
      }
      
      const message = {
        type: 'chat',
        roomId: currentRoom,
        content,
        timestamp: Date.now(),
        id: generateId()
      };
      
      ws.send(JSON.stringify(message));
      input.value = '';
    }
    
    function joinRoom() {
      const roomId = document.getElementById('roomId').value.trim();
      if (!roomId || !ws || ws.readyState !== WebSocket.OPEN) {
        alert('请先连接WebSocket');
        return;
      }
      ws.send(JSON.stringify({ type: 'join', roomId }));
    }
    
    function addMessage(type, author, content, timestamp, isMine = false) {
      const messagesEl = document.getElementById('messages');
      const msgEl = document.createElement('div');
      msgEl.className = `message ${type} ${isMine ? 'my-message' : ''}`;
      
      if (type === 'system') {
        msgEl.textContent = author; // author is actually the system message
      } else {
        const time = timestamp ? new Date(timestamp).toLocaleTimeString() : '';
        msgEl.innerHTML = `<strong>${escapeHtml(author)}</strong> [${time}]: ${escapeHtml(content)}`;
      }
      
      messagesEl.appendChild(msgEl);
      messagesEl.scrollTop = messagesEl.scrollHeight;
    }
    
    function showTyping(username, isTyping) {
      const typingEl = document.getElementById('typing');
      if (isTyping) {
        typingEl.textContent = `${username} 正在输入...`;
      } else {
        typingEl.textContent = '';
      }
    }
    
    // 输入状态通知(防抖)
    let typingTimer = null;
    document.getElementById('msgInput').addEventListener('input', () => {
      if (!ws || ws.readyState !== WebSocket.OPEN || !currentRoom) return;
      
      ws.send(JSON.stringify({ type: 'typing', roomId: currentRoom, isTyping: true }));
      
      clearTimeout(typingTimer);
      typingTimer = setTimeout(() => {
        ws.send(JSON.stringify({ type: 'typing', roomId: currentRoom, isTyping: false }));
      }, 1000);
    });
    
    function generateId() {
      return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
    }
    
    function getCurrentUserId() {
      // 从JWT解析(简化版)
      const token = document.getElementById('token').value;
      try {
        const payload = JSON.parse(atob(token.split('.')[1]));
        return payload.userId;
      } catch {
        return null;
      }
    }
    
    function escapeHtml(text) {
      const div = document.createElement('div');
      div.textContent = text;
      return div.innerHTML;
    }
  </script>
</body>
</html>

关键技术点解析

1. 心跳机制(Heartbeat)

WebSocket连接可能因为网络问题静默断开(”半开连接”)。解决方法:

// 服务端:每30秒ping一次
setInterval(() => {
  wss.clients.forEach(ws => {
    if (ws.isAlive === false) return ws.terminate();
    ws.isAlive = false;
    ws.ping(() => {});
  });
}, 30000);

// 客户端:响应pong
ws.on('pong', () => { ws.isAlive = true; });

2. 指数退避重连

网络抖动时,如果立刻重连会雪崩。正确做法:

let reconnectAttempts = 0;

function scheduleReconnect() {
  const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000);
  setTimeout(() => {
    reconnectAttempts++;
    connect();
  }, delay);
}

退避序列:1秒 → 2秒 → 4秒 → 8秒 → … → 最长30秒。

3. 消息队列(离线缓存)

用户网络断开时,发送的消息不能丢失:

let messageQueue = [];

function sendMessage(content) {
  if (ws.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify({ type: 'chat', content }));
  } else {
    // 离线:缓存
    messageQueue.push({ type: 'chat', content });
  }
}

// 重连成功后发送缓存
ws.onopen = () => {
  while (messageQueue.length > 0) {
    ws.send(JSON.stringify(messageQueue.shift()));
  }
};

4. 速率限制(防刷屏)

const rateLimits = new Map();

function checkRateLimit(userId) {
  const now = Date.now();
  const userMsgs = (rateLimits.get(userId) || [])
    .filter(t => now - t < 10000); // 10秒窗口
  
  if (userMsgs.length >= 10) return false; // 10秒内最多10条
  userMsgs.push(now);
  rateLimits.set(userId, userMsgs);
  return true;
}

运行和测试

# 安装依赖
npm install ws jsonwebtoken

# 启动服务器
JWT_SECRET=mysecret PORT=8080 node server.js

# 生成测试JWT(另一个终端)
node -e "const jwt = require('jsonwebtoken'); console.log(jwt.sign({userId:'u1',username:'test'}, 'mysecret', {expiresIn:'1h'}))"

# 复制输出的token,粘贴到前端页面的token输入框,点击"连接"

用两个浏览器标签打开index.html,分别用不同用户登录,就能实时聊天了。

用MonkeyCode扩展功能

当前实现是基础版,可以继续让MonkeyCode添加:

  1. 私聊功能/msg @username 内容
  2. 消息回执:已送达/已读状态
  3. 房间权限:密码房间、管理员
  4. 消息搜索:在服务器端实现全文搜索
  5. 文件传输:用WebSocket发送二进制数据

只需要在初始prompt里加上这些需求,MonkeyCode会生成对应的代码。


WebSocket看起来复杂,但用MonkeyCode拆解成”认证→心跳→房间→消息→重连”几个模块后,每个部分都是几十行代码。关键是需求描述要结构化,不要说”帮我写个聊天室”,而要说”帮我实现WebSocket服务器,包含认证、心跳、房间、速率限制”。

文章摘自:https://www.cnblogs.com/jaryn/p/20218876