如题所述
第1个回答 2024-04-13
当需要通过WebSocket发送群消息并确保在出现错误时能够继续执行,一个稳健的解决方案是结合WebSocket和Redis。WebSocket用于实现实时双向通信,而Redis则作为消息队列,提供高可用性和错误处理策略。以下是关键代码片段和配置,展示了如何处理连接、消息发送和错误处理:
```java
// WebSocket处理器
public class WebSocketHandler extends AbstractWebSocketHandler {
private ConcurrentHashMap sessionMap;
private StringRedisTemplate stringRedisTemplate;
// 连接建立后,存储用户信息
@Override
public void afterConnectionEstablished(WebSocketSession session) {
String userId = session.getAttribute(UserInfo.ID).toString();
sessionMap.put(userId, session);
}
// 发送消息到特定用户或群组
public void sendMessage(MessageBo messageBo, List targetUsers) {
for (String userId : targetUsers) {
sendMessageToUser(userId, messageBo);
}
}
// 发送消息到单个用户
private void sendMessageToUser(String userId, MessageBo messageBo) {
WebSocketSession session = sessionMap.get(userId);
if (session != null) {
session.sendMessage(new TextMessage(messageBo.getMessage()));
} else {
// 如果用户未在线,将消息存入Redis,由监听器处理
stringRedisTemplate.opsForValue().set(userId, JSON.toJSONString(messageBo), RedisMessageTimeout);
}
}
// WebSocket错误处理
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
// 记录错误并尝试重新发送未送达的消息
log.error("WebSocket connection error", exception);
List pendingMessages = sessionMap.computeIfAbsent(session.getId(), id -> {
// 获取session时,可能已断开连接,这时重新获取未发送的消息
// 注意:这里仅示例,实际操作应根据错误处理策略来实现
return ...;
});
if (pendingMessages != null) {
for (MessageBo message : pendingMessages) {
sendMessageToUser(session.getAttribute(UserInfo.ID).toString(), message);
}
}
}
// Redis消息监听器
@Component
public class RedisMessageSubListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
MessageBo receivedMessage = JSON.parseObject(new String(bytes), MessageBo.class);
// 处理接收到的Redis消息
// ...
}
}
}
```
在这个示例中,当WebSocket发送消息时,如果遇到错误,会尝试将未发送的消息暂存到Redis,由Redis监听器在后台处理。这样可以确保即使某个用户连接断开,其他用户仍能收到消息。同时,Redis还可以用于负载均衡和消息持久化,增加系统的稳定性和可靠性。
```java
// WebSocket处理器
public class WebSocketHandler extends AbstractWebSocketHandler {
private ConcurrentHashMap sessionMap;
private StringRedisTemplate stringRedisTemplate;
// 连接建立后,存储用户信息
@Override
public void afterConnectionEstablished(WebSocketSession session) {
String userId = session.getAttribute(UserInfo.ID).toString();
sessionMap.put(userId, session);
}
// 发送消息到特定用户或群组
public void sendMessage(MessageBo messageBo, List targetUsers) {
for (String userId : targetUsers) {
sendMessageToUser(userId, messageBo);
}
}
// 发送消息到单个用户
private void sendMessageToUser(String userId, MessageBo messageBo) {
WebSocketSession session = sessionMap.get(userId);
if (session != null) {
session.sendMessage(new TextMessage(messageBo.getMessage()));
} else {
// 如果用户未在线,将消息存入Redis,由监听器处理
stringRedisTemplate.opsForValue().set(userId, JSON.toJSONString(messageBo), RedisMessageTimeout);
}
}
// WebSocket错误处理
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
// 记录错误并尝试重新发送未送达的消息
log.error("WebSocket connection error", exception);
List pendingMessages = sessionMap.computeIfAbsent(session.getId(), id -> {
// 获取session时,可能已断开连接,这时重新获取未发送的消息
// 注意:这里仅示例,实际操作应根据错误处理策略来实现
return ...;
});
if (pendingMessages != null) {
for (MessageBo message : pendingMessages) {
sendMessageToUser(session.getAttribute(UserInfo.ID).toString(), message);
}
}
}
// Redis消息监听器
@Component
public class RedisMessageSubListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
MessageBo receivedMessage = JSON.parseObject(new String(bytes), MessageBo.class);
// 处理接收到的Redis消息
// ...
}
}
}
```
在这个示例中,当WebSocket发送消息时,如果遇到错误,会尝试将未发送的消息暂存到Redis,由Redis监听器在后台处理。这样可以确保即使某个用户连接断开,其他用户仍能收到消息。同时,Redis还可以用于负载均衡和消息持久化,增加系统的稳定性和可靠性。