场景
项目中用WebSocket做实时消息推送,但是网络不稳定的时候连接容易断开
断开之后用户得刷新页面才能重新连接,体验不太好
所以需要实现断线自动重连的功能
前端实现
基本封装
先封装一个WebSocket类,支持重连:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| class ReconnectWebSocket { constructor(url, options = {}) { this.url = url; this.reconnectInterval = options.reconnectInterval || 3000; this.maxReconnectAttempts = options.maxReconnectAttempts || 10; this.reconnectAttempts = 0; this.isManualClose = false;
this.onMessage = options.onMessage || function() {}; this.onOpen = options.onOpen || function() {}; this.onClose = options.onClose || function() {}; this.onError = options.onError || function() {};
this.connect(); }
connect() { this.ws = new WebSocket(this.url);
this.ws.onopen = (event) => { console.log('WebSocket连接成功'); this.reconnectAttempts = 0; this.onOpen(event); };
this.ws.onmessage = (event) => { this.onMessage(event); };
this.ws.onclose = (event) => { console.log('WebSocket连接关闭'); this.onClose(event);
if (!this.isManualClose) { this.reconnect(); } };
this.ws.onerror = (event) => { console.error('WebSocket错误:', event); this.onError(event); }; }
reconnect() { if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})...`);
setTimeout(() => { this.connect(); }, this.reconnectInterval); } else { console.error('超过最大重连次数,停止重连'); } }
send(data) { if (this.ws.readyState === WebSocket.OPEN) { this.ws.send(data); } else { console.error('WebSocket未连接,无法发送消息'); } }
close() { this.isManualClose = true; this.ws.close(); } }
|
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| const ws = new ReconnectWebSocket('ws://localhost:8080/ws', { reconnectInterval: 3000, maxReconnectAttempts: 10,
onMessage: (event) => { console.log('收到消息:', event.data); },
onOpen: (event) => { console.log('连接已建立'); },
onClose: (event) => { console.log('连接已关闭'); },
onError: (event) => { console.error('连接错误:', event); } });
ws.send(JSON.stringify({ type: 'chat', content: 'Hello' }));
|
心跳检测
网络有时候会出现假死的情况,需要用心跳检测:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| class HeartbeatWebSocket { constructor(url, options = {}) { this.url = url; this.heartbeatInterval = options.heartbeatInterval || 30000; this.heartbeatTimeout = options.heartbeatTimeout || 5000; this.heartbeatTimer = null; this.heartbeatTimeoutTimer = null;
this.onMessage = options.onMessage || function() {};
this.connect(); }
connect() { this.ws = new WebSocket(this.url);
this.ws.onopen = (event) => { console.log('WebSocket连接成功'); this.startHeartbeat(); this.onOpen(event); };
this.ws.onmessage = (event) => { if (event.data === 'pong') { this.resetHeartbeatTimeout(); return; } this.onMessage(event); };
this.ws.onclose = (event) => { this.stopHeartbeat(); if (!this.isManualClose) { this.reconnect(); } };
this.ws.onerror = (event) => { console.error('WebSocket错误:', event); this.onError(event); }; }
startHeartbeat() { this.stopHeartbeat();
this.heartbeatTimer = setInterval(() => { if (this.ws.readyState === WebSocket.OPEN) { this.ws.send('ping'); this.resetHeartbeatTimeout(); } }, this.heartbeatInterval); }
stopHeartbeat() { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } if (this.heartbeatTimeoutTimer) { clearTimeout(this.heartbeatTimeoutTimer); this.heartbeatTimeoutTimer = null; } }
resetHeartbeatTimeout() { if (this.heartbeatTimeoutTimer) { clearTimeout(this.heartbeatTimeoutTimer); }
this.heartbeatTimeoutTimer = setTimeout(() => { console.warn('心跳超时,关闭连接'); this.ws.close(); }, this.heartbeatTimeout); }
send(data) { if (this.ws.readyState === WebSocket.OPEN) { this.ws.send(data); } }
close() { this.isManualClose = true; this.stopHeartbeat(); this.ws.close(); } }
|
后端实现
WebSocket配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer {
@Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(myHandler(), "/ws") .setAllowedOrigins("*"); }
@Bean public WebSocketHandler myHandler() { return new MyWebSocketHandler(); } }
|
WebSocket处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| @Component public class MyWebSocketHandler extends TextWebSocketHandler {
private static final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
@Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String sessionId = session.getId(); sessions.put(sessionId, session); System.out.println("WebSocket连接建立: " + sessionId);
session.sendMessage(new TextMessage("连接成功")); }
@Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { String payload = message.getPayload();
if ("ping".equals(payload)) { session.sendMessage(new TextMessage("pong")); return; }
System.out.println("收到消息: " + payload);
broadcast("用户 " + session.getId() + " 说: " + payload); }
@Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { String sessionId = session.getId(); sessions.remove(sessionId); System.out.println("WebSocket连接关闭: " + sessionId + ", 状态: " + status); }
@Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { System.err.println("WebSocket传输错误: " + exception.getMessage()); if (session.isOpen()) { session.close(); } }
public void broadcast(String message) { TextMessage textMessage = new TextMessage(message); sessions.values().forEach(session -> { try { if (session.isOpen()) { session.sendMessage(textMessage); } } catch (IOException e) { e.printStackTrace(); } }); } }
|
心跳检测
也可以在服务端检测心跳超时:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component public class HeartbeatScheduler {
@Autowired private MyWebSocketHandler webSocketHandler;
@Scheduled(cron = "0/30 * * * * ?") public void checkHeartbeat() { } }
|
优化的重连策略
指数退避算法
重连间隔逐渐增加,避免频繁重连:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| class ExponentialBackoffWebSocket { constructor(url, options = {}) { this.url = url; this.baseReconnectInterval = options.baseReconnectInterval || 1000; this.maxReconnectInterval = options.maxReconnectInterval || 30000; this.reconnectInterval = this.baseReconnectInterval; this.maxReconnectAttempts = options.maxReconnectAttempts || 10;
}
reconnect() { if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts}), 间隔: ${this.reconnectInterval}ms`);
setTimeout(() => { this.connect();
this.reconnectInterval = Math.min( this.reconnectInterval * 2, this.maxReconnectInterval ); }, this.reconnectInterval); } }
onOpen(event) { this.reconnectInterval = this.baseReconnectInterval; } }
|
断线重连的消息恢复
如果需要在重连后恢复之前的消息,可以实现消息队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| class QueueWebSocket extends ReconnectWebSocket { constructor(url, options) { super(url, options); this.messageQueue = []; this.isQueueEnabled = options.enableQueue !== false; }
send(data) { if (this.ws.readyState === WebSocket.OPEN) { this.ws.send(data); } else { console.warn('WebSocket未连接,消息加入队列'); if (this.isQueueEnabled) { this.messageQueue.push(data); } } }
flushQueue() { while (this.messageQueue.length > 0 && this.ws.readyState === WebSocket.OPEN) { const message = this.messageQueue.shift(); this.ws.send(message); } }
connect() { super.connect(); this.ws.onopen = (event) => { super.onOpen(event); this.flushQueue(); }; } }
|
总结
WebSocket断线重连主要是在前端实现,后端配合做心跳检测
关键点:
- 封装WebSocket类,支持自动重连
- 使用心跳检测识别假死连接
- 指数退避避免频繁重连
- 消息队列保证重要消息不丢失
- 前后端配合实现完整的重连机制
暂时就先记录这么多