关于解决WebSocket连接断开自动重连的问题

场景

项目中用WebSocket做实时消息推送,但是网络不稳定的时候连接容易断开

断开之后用户得刷新页面才能重新连接,体验不太好

所以需要实现断线自动重连的功能

前端实现

基本封装

先封装一个WebSocket类,支持重连:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class ReconnectWebSocket {
constructor(url, options = {}) {
this.url = url;
this.reconnectInterval = options.reconnectInterval || 3000; // 重连间隔
this.maxReconnectAttempts = options.maxReconnectAttempts || 10; // 最大重连次数
this.reconnectAttempts = 0;
this.isManualClose = false; // 是否手动关闭

this.onMessage = options.onMessage || function() {};
this.onOpen = options.onOpen || function() {};
this.onClose = options.onClose || function() {};
this.onError = options.onError || function() {};

this.connect();
}

connect() {
this.ws = new WebSocket(this.url);

this.ws.onopen = (event) => {
console.log('WebSocket连接成功');
this.reconnectAttempts = 0; // 重置重连次数
this.onOpen(event);
};

this.ws.onmessage = (event) => {
this.onMessage(event);
};

this.ws.onclose = (event) => {
console.log('WebSocket连接关闭');
this.onClose(event);

// 如果不是手动关闭,则尝试重连
if (!this.isManualClose) {
this.reconnect();
}
};

this.ws.onerror = (event) => {
console.error('WebSocket错误:', event);
this.onError(event);
};
}

reconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})...`);

setTimeout(() => {
this.connect();
}, this.reconnectInterval);
} else {
console.error('超过最大重连次数,停止重连');
}
}

send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(data);
} else {
console.error('WebSocket未连接,无法发送消息');
}
}

close() {
this.isManualClose = true;
this.ws.close();
}
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 创建WebSocket连接
const ws = new ReconnectWebSocket('ws://localhost:8080/ws', {
reconnectInterval: 3000,
maxReconnectAttempts: 10,

onMessage: (event) => {
console.log('收到消息:', event.data);
// 处理消息
},

onOpen: (event) => {
console.log('连接已建立');
// 连接成功后的操作
},

onClose: (event) => {
console.log('连接已关闭');
// 关闭后的操作
},

onError: (event) => {
console.error('连接错误:', event);
// 错误处理
}
});

// 发送消息
ws.send(JSON.stringify({
type: 'chat',
content: 'Hello'
}));

// 手动关闭连接
// ws.close();

心跳检测

网络有时候会出现假死的情况,需要用心跳检测:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class HeartbeatWebSocket {
constructor(url, options = {}) {
this.url = url;
this.heartbeatInterval = options.heartbeatInterval || 30000; // 心跳间隔
this.heartbeatTimeout = options.heartbeatTimeout || 5000; // 心跳超时
this.heartbeatTimer = null;
this.heartbeatTimeoutTimer = null;

this.onMessage = options.onMessage || function() {};
// ... 其他配置

this.connect();
}

connect() {
this.ws = new WebSocket(this.url);

this.ws.onopen = (event) => {
console.log('WebSocket连接成功');
this.startHeartbeat();
this.onOpen(event);
};

this.ws.onmessage = (event) => {
// 如果收到心跳响应,重置超时定时器
if (event.data === 'pong') {
this.resetHeartbeatTimeout();
return;
}
this.onMessage(event);
};

this.ws.onclose = (event) => {
this.stopHeartbeat();
if (!this.isManualClose) {
this.reconnect();
}
};

this.ws.onerror = (event) => {
console.error('WebSocket错误:', event);
this.onError(event);
};
}

startHeartbeat() {
this.stopHeartbeat(); // 先清除之前的定时器

this.heartbeatTimer = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send('ping');
this.resetHeartbeatTimeout();
}
}, this.heartbeatInterval);
}

stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
if (this.heartbeatTimeoutTimer) {
clearTimeout(this.heartbeatTimeoutTimer);
this.heartbeatTimeoutTimer = null;
}
}

resetHeartbeatTimeout() {
if (this.heartbeatTimeoutTimer) {
clearTimeout(this.heartbeatTimeoutTimer);
}

this.heartbeatTimeoutTimer = setTimeout(() => {
console.warn('心跳超时,关闭连接');
this.ws.close();
}, this.heartbeatTimeout);
}

send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(data);
}
}

close() {
this.isManualClose = true;
this.stopHeartbeat();
this.ws.close();
}
}

后端实现

WebSocket配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/ws")
.setAllowedOrigins("*");
}

@Bean
public WebSocketHandler myHandler() {
return new MyWebSocketHandler();
}
}

WebSocket处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Component
public class MyWebSocketHandler extends TextWebSocketHandler {

private static final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String sessionId = session.getId();
sessions.put(sessionId, session);
System.out.println("WebSocket连接建立: " + sessionId);

// 发送连接成功消息
session.sendMessage(new TextMessage("连接成功"));
}

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();

// 处理心跳
if ("ping".equals(payload)) {
session.sendMessage(new TextMessage("pong"));
return;
}

// 处理业务消息
System.out.println("收到消息: " + payload);

// 可以在这里广播消息给所有连接
broadcast("用户 " + session.getId() + " 说: " + payload);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String sessionId = session.getId();
sessions.remove(sessionId);
System.out.println("WebSocket连接关闭: " + sessionId + ", 状态: " + status);
}

@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.err.println("WebSocket传输错误: " + exception.getMessage());
if (session.isOpen()) {
session.close();
}
}

// 广播消息给所有连接
public void broadcast(String message) {
TextMessage textMessage = new TextMessage(message);
sessions.values().forEach(session -> {
try {
if (session.isOpen()) {
session.sendMessage(textMessage);
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}

心跳检测

也可以在服务端检测心跳超时:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class HeartbeatScheduler {

@Autowired
private MyWebSocketHandler webSocketHandler;

// 每30秒检查一次连接
@Scheduled(cron = "0/30 * * * * ?")
public void checkHeartbeat() {
// 遍历所有连接,检查最后活跃时间
// 如果超过一定时间没有收到心跳,则关闭连接
}
}

优化的重连策略

指数退避算法

重连间隔逐渐增加,避免频繁重连:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class ExponentialBackoffWebSocket {
constructor(url, options = {}) {
this.url = url;
this.baseReconnectInterval = options.baseReconnectInterval || 1000;
this.maxReconnectInterval = options.maxReconnectInterval || 30000;
this.reconnectInterval = this.baseReconnectInterval;
this.maxReconnectAttempts = options.maxReconnectAttempts || 10;

// ...
}

reconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts}), 间隔: ${this.reconnectInterval}ms`);

setTimeout(() => {
this.connect();

// 指数退避,下次重连间隔翻倍
this.reconnectInterval = Math.min(
this.reconnectInterval * 2,
this.maxReconnectInterval
);
}, this.reconnectInterval);
}
}

// 连接成功后重置间隔
onOpen(event) {
this.reconnectInterval = this.baseReconnectInterval;
// ...
}
}

断线重连的消息恢复

如果需要在重连后恢复之前的消息,可以实现消息队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class QueueWebSocket extends ReconnectWebSocket {
constructor(url, options) {
super(url, options);
this.messageQueue = [];
this.isQueueEnabled = options.enableQueue !== false;
}

send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(data);
} else {
console.warn('WebSocket未连接,消息加入队列');
if (this.isQueueEnabled) {
this.messageQueue.push(data);
}
}
}

flushQueue() {
while (this.messageQueue.length > 0 && this.ws.readyState === WebSocket.OPEN) {
const message = this.messageQueue.shift();
this.ws.send(message);
}
}

connect() {
super.connect();
this.ws.onopen = (event) => {
super.onOpen(event);
// 连接成功后发送队列中的消息
this.flushQueue();
};
}
}

总结

WebSocket断线重连主要是在前端实现,后端配合做心跳检测

关键点:

  1. 封装WebSocket类,支持自动重连
  2. 使用心跳检测识别假死连接
  3. 指数退避避免频繁重连
  4. 消息队列保证重要消息不丢失
  5. 前后端配合实现完整的重连机制

暂时就先记录这么多