update 回滚错误的修改 record无法投递到redis序列化

This commit is contained in:
疯狂的狮子Li
2026-03-17 21:04:24 +08:00
parent fcfa5eb767
commit 9aed0b06ca
9 changed files with 73 additions and 38 deletions

View File

@@ -93,10 +93,9 @@ public class AuthController {
Long userId = LoginHelper.getUserId(); Long userId = LoginHelper.getUserId();
scheduledExecutorService.schedule(() -> { scheduledExecutorService.schedule(() -> {
SseMessageDTO dto = new SseMessageDTO( SseMessageDTO dto = new SseMessageDTO();
List.of(userId), dto.setUserIds(List.of(userId));
DateUtils.getTodayHour(new Date()) + "好,欢迎登录 RuoYi-Vue-Plus 后台管理系统" dto.setMessage(DateUtils.getTodayHour(new Date()) + "好,欢迎登录 RuoYi-Vue-Plus 后台管理系统");
);
SseMessageUtils.publishMessage(dto); SseMessageUtils.publishMessage(dto);
}, 5, TimeUnit.SECONDS); }, 5, TimeUnit.SECONDS);
return R.ok(loginVo); return R.ok(loginVo);

View File

@@ -205,10 +205,12 @@ public class SseEmitterManager {
* @param sseMessageDTO 要发布的SSE消息对象 * @param sseMessageDTO 要发布的SSE消息对象
*/ */
public void publishMessage(SseMessageDTO sseMessageDTO) { public void publishMessage(SseMessageDTO sseMessageDTO) {
SseMessageDTO broadcastMessage = new SseMessageDTO(sseMessageDTO.userIds(), sseMessageDTO.message()); SseMessageDTO broadcastMessage = new SseMessageDTO();
broadcastMessage.setUserIds(sseMessageDTO.getUserIds());
broadcastMessage.setMessage(sseMessageDTO.getMessage());
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
SSE_TOPIC, sseMessageDTO.userIds(), sseMessageDTO.message()); SSE_TOPIC, sseMessageDTO.getUserIds(), sseMessageDTO.getMessage());
}); });
} }
@@ -218,7 +220,8 @@ public class SseEmitterManager {
* @param message 要发布的消息内容 * @param message 要发布的消息内容
*/ */
public void publishAll(String message) { public void publishAll(String message) {
SseMessageDTO broadcastMessage = new SseMessageDTO(null, message); SseMessageDTO broadcastMessage = new SseMessageDTO();
broadcastMessage.setMessage(message);
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message); log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message);
}); });

View File

@@ -1,16 +1,29 @@
package org.dromara.common.sse.dto; package org.dromara.common.sse.dto;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
* 消息的DTO * 消息的dto
* *
* @param userIds 接收消息的用户 ID 列表
* @param message 推送消息内容
* @author zendwang * @author zendwang
*/ */
public record SseMessageDTO( @Data
List<Long> userIds, public class SseMessageDTO implements Serializable {
String message
) { @Serial
private static final long serialVersionUID = 1L;
/**
* 需要推送到的session key 列表
*/
private List<Long> userIds;
/**
* 需要发送的消息
*/
private String message;
} }

View File

@@ -28,14 +28,14 @@ public class SseTopicListener implements ApplicationRunner, Ordered {
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
sseEmitterManager.subscribeMessage((message) -> { sseEmitterManager.subscribeMessage((message) -> {
log.info("SSE主题订阅收到消息session keys={} message={}", message.userIds(), message.message()); log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage());
// 如果key不为空就按照key发消息 如果为空就群发 // 如果key不为空就按照key发消息 如果为空就群发
if (CollUtil.isNotEmpty(message.userIds())) { if (CollUtil.isNotEmpty(message.getUserIds())) {
message.userIds().forEach(key -> { message.getUserIds().forEach(key -> {
sseEmitterManager.sendMessage(key, message.message()); sseEmitterManager.sendMessage(key, message.getMessage());
}); });
} else { } else {
sseEmitterManager.sendMessage(message.message()); sseEmitterManager.sendMessage(message.getMessage());
} }
}); });
log.info("初始化SSE主题订阅监听器成功"); log.info("初始化SSE主题订阅监听器成功");

View File

@@ -1,16 +1,29 @@
package org.dromara.common.websocket.dto; package org.dromara.common.websocket.dto;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
* 消息的DTO * 消息的dto
* *
* @param sessionKeys WebSocket 会话标识列表
* @param message 推送消息内容
* @author zendwang * @author zendwang
*/ */
public record WebSocketMessageDTO( @Data
List<Long> sessionKeys, public class WebSocketMessageDTO implements Serializable {
String message
) { @Serial
private static final long serialVersionUID = 1L;
/**
* 需要推送到的session key 列表
*/
private List<Long> sessionKeys;
/**
* 需要发送的消息
*/
private String message;
} }

View File

@@ -51,7 +51,9 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler {
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
// 创建WebSocket消息DTO对象 // 创建WebSocket消息DTO对象
WebSocketMessageDTO messageDTO = new WebSocketMessageDTO(List.of(loginUser.getUserId()), message.getPayload()); WebSocketMessageDTO messageDTO = new WebSocketMessageDTO();
messageDTO.setSessionKeys(List.of(loginUser.getUserId()));
messageDTO.setMessage(message.getPayload());
WebSocketUtils.publishMessage(messageDTO); WebSocketUtils.publishMessage(messageDTO);
} }

View File

@@ -26,17 +26,17 @@ public class WebSocketTopicListener implements ApplicationRunner, Ordered {
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
// 订阅WebSocket消息 // 订阅WebSocket消息
WebSocketUtils.subscribeMessage((message) -> { WebSocketUtils.subscribeMessage((message) -> {
log.info("WebSocket主题订阅收到消息session keys={} message={}", message.sessionKeys(), message.message()); log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage());
// 如果key不为空就按照key发消息 如果为空就群发 // 如果key不为空就按照key发消息 如果为空就群发
if (CollUtil.isNotEmpty(message.sessionKeys())) { if (CollUtil.isNotEmpty(message.getSessionKeys())) {
message.sessionKeys().forEach(key -> { message.getSessionKeys().forEach(key -> {
if (WebSocketSessionHolder.existSession(key)) { if (WebSocketSessionHolder.existSession(key)) {
WebSocketUtils.sendMessage(key, message.message()); WebSocketUtils.sendMessage(key, message.getMessage());
} }
}); });
} else { } else {
WebSocketSessionHolder.getSessionsAll().forEach(key -> { WebSocketSessionHolder.getSessionsAll().forEach(key -> {
WebSocketUtils.sendMessage(key, message.message()); WebSocketUtils.sendMessage(key, message.getMessage());
}); });
} }
}); });

View File

@@ -56,19 +56,21 @@ public class WebSocketUtils {
public static void publishMessage(WebSocketMessageDTO webSocketMessage) { public static void publishMessage(WebSocketMessageDTO webSocketMessage) {
List<Long> unsentSessionKeys = new ArrayList<>(); List<Long> unsentSessionKeys = new ArrayList<>();
// 当前服务内session,直接发送消息 // 当前服务内session,直接发送消息
for (Long sessionKey : webSocketMessage.sessionKeys()) { for (Long sessionKey : webSocketMessage.getSessionKeys()) {
if (WebSocketSessionHolder.existSession(sessionKey)) { if (WebSocketSessionHolder.existSession(sessionKey)) {
WebSocketUtils.sendMessage(sessionKey, webSocketMessage.message()); WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage());
continue; continue;
} }
unsentSessionKeys.add(sessionKey); unsentSessionKeys.add(sessionKey);
} }
// 不在当前服务内session,发布订阅消息 // 不在当前服务内session,发布订阅消息
if (CollUtil.isNotEmpty(unsentSessionKeys)) { if (CollUtil.isNotEmpty(unsentSessionKeys)) {
WebSocketMessageDTO broadcastMessage = new WebSocketMessageDTO(unsentSessionKeys, webSocketMessage.message()); WebSocketMessageDTO broadcastMessage = new WebSocketMessageDTO();
broadcastMessage.setSessionKeys(unsentSessionKeys);
broadcastMessage.setMessage(webSocketMessage.getMessage());
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}", log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}",
WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.message()); WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
}); });
} }
} }
@@ -79,7 +81,8 @@ public class WebSocketUtils {
* @param message 要发布的消息内容 * @param message 要发布的消息内容
*/ */
public static void publishAll(String message) { public static void publishAll(String message) {
WebSocketMessageDTO broadcastMessage = new WebSocketMessageDTO(null, message); WebSocketMessageDTO broadcastMessage = new WebSocketMessageDTO();
broadcastMessage.setMessage(message);
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message); log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
}); });

View File

@@ -92,7 +92,9 @@ public class FlwCommonServiceImpl implements IFlwCommonService {
try { try {
switch (messageTypeEnum) { switch (messageTypeEnum) {
case SYSTEM_MESSAGE -> { case SYSTEM_MESSAGE -> {
SseMessageDTO dto = new SseMessageDTO(userIds, message); SseMessageDTO dto = new SseMessageDTO();
dto.setUserIds(userIds);
dto.setMessage(message);
SseMessageUtils.publishMessage(dto); SseMessageUtils.publishMessage(dto);
} }
case EMAIL_MESSAGE -> MailUtils.sendText(emails, subject, message); case EMAIL_MESSAGE -> MailUtils.sendText(emails, subject, message);