From 920d717cd0ff1600f83f2cc9dc8cd20aa9c3ed62 Mon Sep 17 00:00:00 2001 From: AprilWind <2100166581@qq.com> Date: Mon, 30 Mar 2026 14:36:54 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E5=A2=9E=E5=8A=A0=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=8E=A8=E9=80=81=E6=A8=A1=E5=9D=97=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/core/service/MessageService.java | 39 +++++ ruoyi-common/ruoyi-common-push/pom.xml | 2 +- .../push/config/MessageSseConfiguration.java | 20 +++ .../config/MessageWebSocketConfiguration.java | 20 +++ .../push/constant/MessageConstants.java | 15 ++ .../common/push/core/PushSessionManager.java | 27 +++ .../push/core/WebSocketSessionManager.java | 90 ++++++++++ .../push/enums/MessageTransportEnum.java | 25 +++ .../push/handler/PlusWebSocketHandler.java | 50 +++++- .../common/push/helper/PushHelper.java | 61 +++++++ .../interceptor/PlusWebSocketInterceptor.java | 23 ++- .../push/listener/MessageTopicListener.java | 17 ++ .../system/service/ISysMessageService.java | 48 +++--- .../service/impl/SysMessageServiceImpl.java | 156 ++++++++++++++++++ 14 files changed, 566 insertions(+), 27 deletions(-) diff --git a/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/service/MessageService.java b/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/service/MessageService.java index 275b99aad..aa3487716 100644 --- a/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/service/MessageService.java +++ b/ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/service/MessageService.java @@ -11,17 +11,56 @@ import java.util.List; */ public interface MessageService { + /** + * 发送指定用户文本消息 + * + * @param userId 目标用户ID + * @param message 文本消息内容 + */ void sendMessage(Long userId, String message); + /** + * 全局广播文本消息 + * + * @param message 文本消息内容 + */ void sendMessage(String message); + /** + * 发送指定用户自定义消息体 + * + * @param userId 目标用户ID + * @param payload 消息推送体 + */ void sendMessage(Long userId, PushPayloadDTO payload); + /** + * 全局广播自定义消息体 + * + * @param payload 消息推送体 + */ void sendMessage(PushPayloadDTO payload); + /** + * 批量发布消息给指定用户列表 + * + * @param userIds 用户ID集合 + * @param payload 消息推送体 + */ void publishMessage(List userIds, PushPayloadDTO payload); + /** + * 发布全局广播文本消息 + * + * @param message 文本消息内容 + */ void publishAll(String message); + /** + * 发布全局广播自定义消息体 + * + * @param payload 消息推送体 + */ void publishAll(PushPayloadDTO payload); + } diff --git a/ruoyi-common/ruoyi-common-push/pom.xml b/ruoyi-common/ruoyi-common-push/pom.xml index 4d888e05f..80929c3b5 100644 --- a/ruoyi-common/ruoyi-common-push/pom.xml +++ b/ruoyi-common/ruoyi-common-push/pom.xml @@ -12,7 +12,7 @@ ruoyi-common-push - ruoyi-common-push 模块 + ruoyi-common-push 消息推送模块 diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java index 02b9d78ff..2f14eb893 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java @@ -16,16 +16,36 @@ import org.springframework.context.annotation.Bean; @ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "sse", matchIfMissing = true) public class MessageSseConfiguration { + /** + * 注册 SSE 会话管理器 + * 负责管理用户 SSE 连接、消息发送、会话清理 + * + * @return SseEmitterSessionManager 实例 + */ @Bean public SseEmitterSessionManager sseEmitterManager() { return new SseEmitterSessionManager(); } + /** + * 注册消息主题监听器 + * 监听 Redis 全局消息,用于集群环境下的消息分发 + * + * @param manager SSE 会话管理器 + * @return MessageTopicListener 实例 + */ @Bean public MessageTopicListener messageTopicListener(SseEmitterSessionManager manager) { return new MessageTopicListener(manager); } + /** + * 注册 SSE 控制器 + * 提供前端建立 SSE 连接的接口 + * + * @param manager SSE 会话管理器 + * @return SseController 实例 + */ @Bean public SseController sseController(SseEmitterSessionManager manager) { return new SseController(manager); diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java index b1bb6ec2e..b4bbd30eb 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java @@ -23,6 +23,10 @@ import org.springframework.web.socket.server.HandshakeInterceptor; @ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "websocket") public class MessageWebSocketConfiguration { + /** + * WebSocket 配置注册 + * 配置连接路径、拦截器、跨域 + */ @Bean public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, WebSocketHandler webSocketHandler, @@ -33,21 +37,37 @@ public class MessageWebSocketConfiguration { .setAllowedOrigins(messageProperties.getAllowedOrigins()); } + /** + * WebSocket 会话管理器 + * 负责连接管理、消息发送、定时清理失效会话 + */ @Bean public WebSocketSessionManager webSocketSessionManager() { return new WebSocketSessionManager(); } + /** + * WebSocket 握手拦截器 + * 建立连接前做登录校验、客户端ID校验 + */ @Bean public HandshakeInterceptor handshakeInterceptor() { return new PlusWebSocketInterceptor(); } + /** + * WebSocket 消息处理器 + * 处理连接、消息、心跳、断开、异常等事件 + */ @Bean public WebSocketHandler webSocketHandler(WebSocketSessionManager webSocketSessionManager) { return new PlusWebSocketHandler(webSocketSessionManager); } + /** + * 消息主题监听器 + * 订阅 Redis 消息,实现集群环境下的消息分发 + */ @Bean public MessageTopicListener messageTopicListener(WebSocketSessionManager webSocketSessionManager) { return new MessageTopicListener(webSocketSessionManager); diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/constant/MessageConstants.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/constant/MessageConstants.java index 3f66957af..cefec0238 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/constant/MessageConstants.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/constant/MessageConstants.java @@ -7,13 +7,28 @@ package org.dromara.common.push.constant; */ public interface MessageConstants { + /** + * 登录用户信息 + */ String LOGIN_USER_KEY = "loginUser"; + /** + * 登录令牌 + */ String LOGIN_TOKEN_KEY = "token"; + /** + * 全局消息订阅主题 + */ String MESSAGE_TOPIC = "global:message"; + /** + * 心跳请求标识 + */ String PING = "ping"; + /** + * 心跳响应标识 + */ String PONG = "pong"; } diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/PushSessionManager.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/PushSessionManager.java index bbe926ff1..4ee8c525f 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/PushSessionManager.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/PushSessionManager.java @@ -12,13 +12,40 @@ import java.util.function.Consumer; */ public interface PushSessionManager { + /** + * 订阅消息通道 + * 注册消息消费者,用于监听并处理消息推送事件 + * + * @param consumer 消息消费逻辑 + */ void subscribeMessage(Consumer consumer); + /** + * 发送消息给指定用户 + * + * @param userId 目标用户ID + * @param payload 消息体 + */ void sendMessage(Long userId, PushPayloadDTO payload); + /** + * 全局广播消息(所有在线用户) + * + * @param payload 消息体 + */ void sendMessage(PushPayloadDTO payload); + /** + * 批量发布消息给指定用户列表 + * + * @param pushDTO 推送参数封装对象 + */ void publishMessage(PushDTO pushDTO); + /** + * 全局广播消息(所有用户) + * + * @param payload 消息体 + */ void publishAll(PushPayloadDTO payload); } diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java index 1bde966e8..9655ef1fa 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java @@ -33,20 +33,44 @@ import static org.dromara.common.push.constant.MessageConstants.MESSAGE_TOPIC; @Slf4j public class WebSocketSessionManager implements PushSessionManager { + /** + * 用户会话存储集合 + * 结构:userId -> (token -> WebSocketSession) + * 支持同一用户多终端、多设备同时在线 + */ private static final Map> USER_TOKEN_SESSIONS = new ConcurrentHashMap<>(); + /** + * 构造函数 + * 初始化定时任务:每60秒执行一次会话监控,自动清理无效连接 + */ public WebSocketSessionManager() { SpringUtils.getBean(ScheduledExecutorService.class) .scheduleWithFixedDelay(this::sessionMonitor, 60L, 60L, TimeUnit.SECONDS); } + /** + * 用户建立WebSocket连接 + * + * @param userId 用户ID + * @param token 客户端唯一标识(区分不同设备/终端) + * @param session WebSocket会话对象 + */ public void connect(Long userId, String token, WebSocketSession session) { Map sessions = USER_TOKEN_SESSIONS.computeIfAbsent(userId, key -> new ConcurrentHashMap<>()); + // 移除并关闭旧的同token会话,避免重复连接 WebSocketSession oldSession = sessions.remove(token); closeSession(oldSession, CloseStatus.NORMAL); + // 存储新会话 sessions.put(token, session); } + /** + * 用户断开WebSocket连接 + * + * @param userId 用户ID + * @param token 客户端唯一标识 + */ public void disconnect(Long userId, String token) { if (userId == null || token == null) { return; @@ -56,12 +80,18 @@ public class WebSocketSessionManager implements PushSessionManager { USER_TOKEN_SESSIONS.remove(userId); return; } + // 移除指定token会话并关闭 closeSession(sessions.remove(token), CloseStatus.NORMAL); + // 该用户无任何会话时,从缓存中移除 if (sessions.isEmpty()) { USER_TOKEN_SESSIONS.remove(userId); } } + /** + * 会话监控定时任务 + * 定期清理已关闭、失效的WebSocket会话,防止内存泄漏 + */ public void sessionMonitor() { List toRemoveUsers = new ArrayList<>(); USER_TOKEN_SESSIONS.forEach((userId, sessionMap) -> { @@ -69,6 +99,7 @@ public class WebSocketSessionManager implements PushSessionManager { toRemoveUsers.add(userId); return; } + // 移除已关闭的无效会话 sessionMap.entrySet().removeIf(entry -> { WebSocketSession session = entry.getValue(); if (session == null || !session.isOpen()) { @@ -77,18 +108,32 @@ public class WebSocketSessionManager implements PushSessionManager { } return false; }); + // 无有效会话,标记用户待删除 if (sessionMap.isEmpty()) { toRemoveUsers.add(userId); } }); + // 批量清理无会话用户 toRemoveUsers.forEach(USER_TOKEN_SESSIONS::remove); } + /** + * 订阅消息通道 + * 注册消息消费者,监听Redis消息推送 + * + * @param consumer 消息消费逻辑 + */ @Override public void subscribeMessage(Consumer consumer) { RedisUtils.subscribe(MESSAGE_TOPIC, PushDTO.class, consumer); } + /** + * 向指定用户发送消息 + * + * @param userId 目标用户ID + * @param payload 消息体 + */ @Override public void sendMessage(Long userId, PushPayloadDTO payload) { if (payload == null) { @@ -99,24 +144,38 @@ public class WebSocketSessionManager implements PushSessionManager { USER_TOKEN_SESSIONS.remove(userId); return; } + // 发送消息并自动清理失效会话 sessions.entrySet().removeIf(entry -> { WebSocketSession session = entry.getValue(); if (session == null || !session.isOpen()) { closeSession(session, CloseStatus.NORMAL); return true; } + // 发送失败的会话也会被移除 return !sendMessage(session, new TextMessage(JsonUtils.toJsonString(payload))); }); + // 无有效会话则移除用户 if (sessions.isEmpty()) { USER_TOKEN_SESSIONS.remove(userId); } } + /** + * 向所有在线用户广播消息 + * + * @param payload 消息体 + */ @Override public void sendMessage(PushPayloadDTO payload) { USER_TOKEN_SESSIONS.keySet().forEach(userId -> sendMessage(userId, payload)); } + /** + * 发布消息到Redis订阅通道 + * 支持集群环境下的分布式消息推送 + * + * @param pushDTO 推送消息封装对象 + */ @Override public void publishMessage(PushDTO pushDTO) { RedisUtils.publish(MESSAGE_TOPIC, pushDTO, consumer -> log.info( @@ -127,6 +186,11 @@ public class WebSocketSessionManager implements PushSessionManager { )); } + /** + * 全局广播消息(所有用户) + * + * @param payload 消息体 + */ @Override public void publishAll(PushPayloadDTO payload) { PushDTO dto = new PushDTO(); @@ -134,14 +198,33 @@ public class WebSocketSessionManager implements PushSessionManager { publishMessage(dto); } + /** + * 发送心跳Pong消息 + * 用于维持WebSocket长连接存活 + * + * @param session WebSocket会话 + */ public void sendPongMessage(WebSocketSession session) { sendMessage(session, new PongMessage()); } + /** + * 发送文本消息 + * + * @param session WebSocket会话 + * @param message 文本内容 + */ public void sendMessage(WebSocketSession session, String message) { sendMessage(session, new TextMessage(message)); } + /** + * 底层消息发送方法 + * + * @param session 会话对象 + * @param message WebSocket消息对象 + * @return 发送是否成功 + */ private boolean sendMessage(WebSocketSession session, WebSocketMessage message) { if (session == null || !session.isOpen()) { log.warn("[send] session会话已经关闭"); @@ -156,6 +239,12 @@ public class WebSocketSessionManager implements PushSessionManager { } } + /** + * 安全关闭WebSocket会话 + * + * @param session 待关闭的会话 + * @param status 关闭状态码 + */ private void closeSession(WebSocketSession session, CloseStatus status) { if (session == null) { return; @@ -163,6 +252,7 @@ public class WebSocketSessionManager implements PushSessionManager { try { session.close(status); } catch (Exception ignored) { + // 关闭异常忽略,防止影响主流程 } } } diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/enums/MessageTransportEnum.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/enums/MessageTransportEnum.java index ee5660e1b..362d26645 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/enums/MessageTransportEnum.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/enums/MessageTransportEnum.java @@ -14,15 +14,40 @@ import java.util.Arrays; @AllArgsConstructor public enum MessageTransportEnum { + /** + * SSE 传输方式 + * 服务端推送事件,单向轻量传输 + */ SSE("sse"), + + /** + * WebSocket 传输方式 + * 全双工长连接,支持双向实时通信 + */ WEBSOCKET("websocket"); + /** + * 传输类型编码 + */ private final String code; + /** + * 判断传输方式是否匹配 + * + * @param transport 传输方式字符串 + * @return 是否匹配 + */ public boolean matches(String transport) { return code.equalsIgnoreCase(transport); } + /** + * 根据传输类型字符串获取枚举 + * 找不到则默认返回 SSE + * + * @param transport 传输方式字符串 + * @return 对应的消息传输枚举 + */ public static MessageTransportEnum of(String transport) { return Arrays.stream(values()) .filter(item -> item.matches(transport)) diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java index 1251d238d..5e86c3632 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java @@ -22,7 +22,8 @@ import java.io.IOException; import java.util.List; /** - * WebSocket Handler。 + * WebSocket 请求处理器 + * 处理WebSocket连接建立、消息接收、异常、断开等全生命周期事件 * * @author Lion Li */ @@ -30,17 +31,31 @@ import java.util.List; @Slf4j public class PlusWebSocketHandler extends AbstractWebSocketHandler { + /** + * WebSocket 会话管理器 + */ private final WebSocketSessionManager webSocketSessionManager; + /** + * 建立WebSocket连接后触发 + * 校验用户登录信息,注册会话 + * + * @param session WebSocket会话 + */ @Override public void afterConnectionEstablished(WebSocketSession session) throws IOException { + // 从会话属性中获取登录用户信息和Token LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY); String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY); + + // 校验用户信息是否为空,无效则直接关闭连接 if (ObjectUtil.hasNull(loginUser, token)) { session.close(CloseStatus.BAD_DATA); log.info("[connect] invalid token received. sessionId: {}", session.getId()); return; } + + // 并发安全包装会话,并注册到会话管理器 webSocketSessionManager.connect( loginUser.getUserId(), token, @@ -49,16 +64,27 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler { log.info("[connect] sessionId: {}, userId:{}, token:{}", session.getId(), loginUser.getUserId(), token); } + /** + * 处理客户端发送的文本消息 + * 支持心跳ping/pong,以及自定义消息转发 + * + * @param session WebSocket会话 + * @param message 文本消息 + */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY); if (ObjectUtil.isNull(loginUser)) { return; } + + // 心跳处理:客户端发送ping,服务端回复pong if (MessageConstants.PING.equalsIgnoreCase(message.getPayload())) { webSocketSessionManager.sendMessage(session, MessageConstants.PONG); return; } + + // 构建客户端自定义消息并发布 PushDTO dto = new PushDTO(); dto.setUserIds(List.of(loginUser.getUserId())); dto.setPayload(PushPayloadDTO.of( @@ -70,33 +96,55 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler { webSocketSessionManager.publishMessage(dto); } + /** + * 处理二进制消息(默认实现) + */ @Override protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { super.handleBinaryMessage(session, message); } + /** + * 处理Pong心跳响应 + * 维持长连接存活 + */ @Override protected void handlePongMessage(WebSocketSession session, PongMessage message) { webSocketSessionManager.sendPongMessage(session); } + /** + * 传输异常处理 + * 记录异常日志 + */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) { log.error("[transport error] sessionId: {}, exception:{}", session.getId(), exception.getMessage()); } + /** + * 连接关闭后触发 + * 注销用户会话 + */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY); String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY); + if (ObjectUtil.hasNull(loginUser, token)) { log.info("[disconnect] invalid token received. sessionId: {}", session.getId()); return; } + + // 从会话管理器中移除连接 webSocketSessionManager.disconnect(loginUser.getUserId(), token); log.info("[disconnect] sessionId: {}, userId:{}, token:{}", session.getId(), loginUser.getUserId(), token); } + /** + * 是否支持分片消息 + * 关闭:不支持分片传输 + */ @Override public boolean supportsPartialMessages() { return false; diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java index ed229542a..148733a51 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java @@ -19,14 +19,31 @@ import java.util.List; @NoArgsConstructor(access = AccessLevel.PRIVATE) public class PushHelper { + /** + * 发送指定用户文本消息 + * + * @param userId 目标用户ID + * @param message 文本消息内容 + */ public static void sendMessage(Long userId, String message) { sendMessage(userId, buildMessage(message)); } + /** + * 全局广播文本消息 + * + * @param message 文本消息内容 + */ public static void sendMessage(String message) { sendMessage(buildMessage(message)); } + /** + * 发送指定用户自定义消息体 + * + * @param userId 目标用户ID + * @param payload 消息推送体 + */ public static void sendMessage(Long userId, PushPayloadDTO payload) { if (!isEnabled()) { return; @@ -34,6 +51,11 @@ public class PushHelper { getSessionManager().sendMessage(userId, payload); } + /** + * 全局广播自定义消息体 + * + * @param payload 消息推送体 + */ public static void sendMessage(PushPayloadDTO payload) { if (!isEnabled()) { return; @@ -41,6 +63,12 @@ public class PushHelper { getSessionManager().sendMessage(payload); } + /** + * 批量发布消息给指定用户列表 + * + * @param userIds 用户ID集合 + * @param payload 消息推送体 + */ public static void publishMessage(List userIds, PushPayloadDTO payload) { PushDTO dto = new PushDTO(); dto.setUserIds(userIds); @@ -48,6 +76,11 @@ public class PushHelper { publishMessage(dto); } + /** + * 批量发布消息(使用完整推送DTO) + * + * @param dto 推送参数封装对象 + */ public static void publishMessage(PushDTO dto) { if (!isEnabled() || dto == null || dto.getPayload() == null) { return; @@ -55,10 +88,20 @@ public class PushHelper { getSessionManager().publishMessage(dto); } + /** + * 发布全局广播文本消息 + * + * @param message 文本消息内容 + */ public static void publishAll(String message) { publishAll(buildMessage(message)); } + /** + * 发布全局广播自定义消息体 + * + * @param payload 消息推送体 + */ public static void publishAll(PushPayloadDTO payload) { if (!isEnabled()) { return; @@ -66,15 +109,33 @@ public class PushHelper { getSessionManager().publishAll(payload); } + /** + * 判断消息推送功能是否开启 + * 读取配置:message.enabled + * + * @return 是否开启推送 + */ public static boolean isEnabled() { return Boolean.TRUE.equals(SpringUtils.getProperty("message.enabled", Boolean.class, Boolean.TRUE)); } + /** + * 获取推送会话管理器Bean + * + * @return PushSessionManager 实例 + */ private static PushSessionManager getSessionManager() { return SpringUtils.getBean(PushSessionManager.class); } + /** + * 构建默认格式的消息推送体 + * + * @param message 消息内容 + * @return 封装好的 PushPayloadDTO + */ private static PushPayloadDTO buildMessage(String message) { return PushPayloadDTO.of(PushTypeEnum.MESSAGE, PushSourceEnum.BACKEND, message, null); } + } diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java index a830be5f9..8d0630f20 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java @@ -14,6 +14,7 @@ import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.Map; + /** * WebSocket 握手拦截器。 * @@ -22,36 +23,54 @@ import java.util.Map; @Slf4j public class PlusWebSocketInterceptor implements HandshakeInterceptor { + /** + * 握手前拦截(核心认证逻辑) + * 校验登录状态、Token、客户端ID,认证通过才允许建立 WebSocket 连接 + * + * @param attributes 用于传递到 WebSocketSession 的属性集合 + * @return 是否允许握手(true=允许,false=拒绝) + */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) { try { + // 1. 获取当前登录用户与 Token LoginUser loginUser = LoginHelper.getLoginUser(); String tokenValue = StpUtil.getTokenValue(); + + // 2. 未登录直接拒绝握手 if (loginUser == null || StringUtils.isBlank(tokenValue)) { return false; } + // 3. 校验客户端ID(防止多端冒用) String headerCid = ServletUtils.getRequest().getHeader(LoginHelper.CLIENT_KEY); String paramCid = ServletUtils.getParameter(LoginHelper.CLIENT_KEY); String clientId = StpUtil.getExtra(LoginHelper.CLIENT_KEY).toString(); + + // 客户端ID必须与请求头/参数中的一致,否则拒绝连接 if (!StringUtils.equalsAny(clientId, headerCid, paramCid)) { throw NotLoginException.newInstance(StpUtil.getLoginType(), "-100", "客户端ID与Token不匹配", StpUtil.getTokenValue()); } + // 4. 认证通过,将用户信息存入会话属性,供后续 WebSocketHandler 使用 attributes.put(MessageConstants.LOGIN_USER_KEY, loginUser); attributes.put(MessageConstants.LOGIN_TOKEN_KEY, tokenValue); return true; } catch (NotLoginException e) { + // 认证失败,记录日志并拒绝连接 log.error("WebSocket 认证失败'{}',无法访问系统资源", e.getMessage()); return false; } } + /** + * 握手完成后触发 + * 此处无需处理,留空即可 + */ @Override - public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, - Exception exception) { + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } } diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java index 4f1bbad7a..a0cb451a4 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java @@ -17,10 +17,20 @@ import org.springframework.core.Ordered; @RequiredArgsConstructor public class MessageTopicListener implements ApplicationRunner, Ordered { + /** + * 推送会话管理器 + */ private final PushSessionManager pushSessionManager; + /** + * 项目启动后执行 + * 注册消息订阅,监听消息并分发给对应用户/全局广播 + * + * @param args 启动参数 + */ @Override public void run(ApplicationArguments args) { + // 订阅消息主题,处理消息分发 pushSessionManager.subscribeMessage(message -> { log.info("消息主题订阅收到消息userIds={} message={}", message.getUserIds(), @@ -28,15 +38,22 @@ public class MessageTopicListener implements ApplicationRunner, Ordered { if (message.getPayload() == null) { return; } + // 有指定用户 -> 单发 if (CollUtil.isNotEmpty(message.getUserIds())) { message.getUserIds().forEach(userId -> pushSessionManager.sendMessage(userId, message.getPayload())); } else { + // 无指定用户 -> 全局广播 pushSessionManager.sendMessage(message.getPayload()); } }); log.info("初始化消息主题订阅监听器成功"); } + /** + * 执行顺序,优先级设为最高,确保消息订阅最先初始化 + * + * @return 优先级,值越小越先执行 + */ @Override public int getOrder() { return -1; diff --git a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/ISysMessageService.java b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/ISysMessageService.java index d4acaed3e..5787c8702 100644 --- a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/ISysMessageService.java +++ b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/ISysMessageService.java @@ -14,6 +14,7 @@ public interface ISysMessageService { /** * 查询当前用户消息盒子数据 + * 按系统消息、通知公告、工作流消息分类返回 * * @param userId 用户ID * @return 消息盒子数据 @@ -23,69 +24,70 @@ public interface ISysMessageService { /** * 发送指定用户文本消息 * - * @param userId 目标用户 - * @param message 文本消息 + * @param userId 目标用户ID + * @param message 文本消息内容 */ void sendMessage(Long userId, String message); /** - * 广播文本消息 + * 全局广播文本消息 * - * @param message 文本消息 + * @param message 文本消息内容 */ void sendMessage(String message); /** - * 发送指定用户消息 + * 发送指定用户自定义消息体 * - * @param userId 目标用户 - * @param payload 推送消息体 + * @param userId 目标用户ID + * @param payload 消息推送体 */ void sendMessage(Long userId, PushPayloadDTO payload); /** - * 广播消息 + * 全局广播自定义消息体 * - * @param payload 推送消息体 + * @param payload 消息推送体 */ void sendMessage(PushPayloadDTO payload); /** - * 发布指定用户消息 + * 批量发布消息给指定用户列表 * - * @param userIds 用户ID列表 - * @param payload 推送消息体 + * @param userIds 用户ID集合 + * @param payload 消息推送体 */ void publishMessage(List userIds, PushPayloadDTO payload); /** - * 发布广播文本消息 + * 发布全局广播文本消息 * - * @param message 文本消息 + * @param message 文本消息内容 */ void publishAll(String message); /** - * 发布广播消息 + * 发布全局广播自定义消息体 * - * @param payload 推送消息体 + * @param payload 消息推送体 */ void publishAll(PushPayloadDTO payload); /** - * 记录全局消息 + * 存储全局广播消息到数据库 * - * @param payload 推送消息体 - * @return 回填消息ID后的推送消息体 + * @param payload 消息推送体 + * @return 回填消息ID后的消息体 */ PushPayloadDTO storeAll(PushPayloadDTO payload); /** - * 记录指定用户消息 + * 存储指定用户消息到数据库 * - * @param userIds 用户ID列表 - * @param payload 推送消息体 - * @return 回填消息ID后的推送消息体 + * @param userIds 用户ID集合 + * @param payload 消息推送体 + * @return 回填消息ID后的消息体 */ PushPayloadDTO storeUsers(List userIds, PushPayloadDTO payload); + } diff --git a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/SysMessageServiceImpl.java b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/SysMessageServiceImpl.java index c3fcc4625..beb33e5dc 100644 --- a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/SysMessageServiceImpl.java +++ b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/SysMessageServiceImpl.java @@ -37,15 +37,45 @@ import java.util.concurrent.TimeUnit; @Service public class SysMessageServiceImpl implements ISysMessageService, MessageService { + /** + * 全局广播用户标识(所有用户可见) + */ private static final String GLOBAL_USER_IDS = "0"; + + /** + * 消息分类:系统消息 + */ private static final String CATEGORY_SYSTEM = "system"; + + /** + * 消息分类:通知公告 + */ private static final String CATEGORY_NOTICE = "notice"; + + /** + * 消息分类:工作流 + */ private static final String CATEGORY_WORKFLOW = "workflow"; + + /** + * 消息盒子每页展示最大条数 + */ private static final int BOX_LIMIT = 100; + + /** + * 消息盒子展示消息天数(仅展示30天内) + */ private static final long BOX_DAYS = 30L; private final SysMessageMapper baseMapper; + /** + * 查询当前用户消息盒子数据 + * 按系统消息、通知公告、工作流消息分类返回 + * + * @param userId 用户ID + * @return 分类消息盒子数据 + */ @Override public SysMessageBoxVo queryMessageBox(Long userId) { SysMessageBoxVo box = new SysMessageBoxVo(); @@ -55,51 +85,110 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService return box; } + /** + * 发送指定用户文本消息 + * + * @param userId 目标用户ID + * @param message 文本消息内容 + */ @Override public void sendMessage(Long userId, String message) { PushHelper.sendMessage(userId, buildDefaultMessage(message)); } + /** + * 全局广播文本消息 + * + * @param message 文本消息内容 + */ @Override public void sendMessage(String message) { PushHelper.sendMessage(buildDefaultMessage(message)); } + /** + * 发送指定用户自定义消息体 + * + * @param userId 目标用户ID + * @param payload 消息推送体 + */ @Override public void sendMessage(Long userId, PushPayloadDTO payload) { PushHelper.sendMessage(userId, payload); } + /** + * 全局广播自定义消息体 + * + * @param payload 消息推送体 + */ @Override public void sendMessage(PushPayloadDTO payload) { PushHelper.sendMessage(payload); } + /** + * 批量发布消息给指定用户列表 + * + * @param userIds 用户ID集合 + * @param payload 消息推送体 + */ @Override public void publishMessage(List userIds, PushPayloadDTO payload) { PushHelper.publishMessage(userIds, storeUsers(userIds, payload)); } + /** + * 发布全局广播文本消息 + * + * @param message 文本消息内容 + */ @Override public void publishAll(String message) { publishAll(buildDefaultMessage(message)); } + /** + * 发布全局广播自定义消息体 + * + * @param payload 消息推送体 + */ @Override public void publishAll(PushPayloadDTO payload) { PushHelper.publishAll(storeAll(payload)); } + /** + * 存储全局广播消息到数据库 + * + * @param payload 消息推送体 + * @return 回填消息ID后的消息体 + */ @Override public PushPayloadDTO storeAll(PushPayloadDTO payload) { return storeMessage(null, payload); } + /** + * 存储指定用户消息到数据库 + * + * @param userIds 用户ID集合 + * @param payload 消息推送体 + * @return 回填消息ID后的消息体 + */ @Override public PushPayloadDTO storeUsers(List userIds, PushPayloadDTO payload) { return storeMessage(userIds, payload); } + /** + * 统一消息存储逻辑 + * 判断是否需要存入消息盒子,需要则插入数据库 + * + * @param userIds 用户ID集合(为null则全局广播) + * @param payload 消息推送体 + * @return 回填消息ID后的消息体 + */ private PushPayloadDTO storeMessage(List userIds, PushPayloadDTO payload) { if (!supportsMessageBox(payload)) { return payload; @@ -110,20 +199,42 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService return payload; } + /** + * 根据分类和用户ID查询消息列表 + * 仅查询30天内、最多100条、按时间倒序 + * + * @param category 消息分类 + * @param userId 用户ID + * @return 消息VO列表 + */ private List selectMessageList(String category, Long userId) { LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); + // 分类匹配 lqw.eq(SysMessage::getCategory, category); + // 仅查询30天内消息 lqw.ge(SysMessage::getCreateTime, new Date(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(BOX_DAYS))); + // 全局消息 或 当前用户在接收人范围内 lqw.and(wrapper -> wrapper.eq(SysMessage::getSendUserIds, GLOBAL_USER_IDS) .or() .apply(DataBaseHelper.findInSet(userId, "send_user_ids"))); + // 按创建时间+消息ID倒序 lqw.orderByDesc(SysMessage::getCreateTime, SysMessage::getMessageId); + // 分页查询(只查第一页,最多100条) List list = baseMapper.selectList(new Page<>(1, BOX_LIMIT, false), lqw); + // 转换为VO并返回 return list.stream().map(this::buildVo).toList(); } + /** + * 构建消息实体(用于数据库存储) + * + * @param userIds 接收用户ID集合 + * @param payload 消息推送体 + * @return 系统消息实体 + */ private SysMessage buildMessage(List userIds, PushPayloadDTO payload) { SysMessage message = new SysMessage(); + // 设置消息ID(无则自动生成) message.setMessageId(payload.getMessageId() == null ? IdGeneratorUtil.nextLongId() : payload.getMessageId()); message.setCategory(resolveCategory(payload)); message.setType(payload.getType()); @@ -133,20 +244,35 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService message.setContent(resolveContent(payload)); message.setDataJson(JsonUtils.toJsonString(payload.getData())); message.setPath(payload.getPath()); + // 设置接收人(无则为全局广播) message.setSendUserIds(CollUtil.isEmpty(userIds) ? GLOBAL_USER_IDS : StringUtils.joinComma(userIds)); return message; } + /** + * 消息实体转换为展示VO + * + * @param entity 消息实体 + * @return 消息展示VO + */ private SysMessageVo buildVo(SysMessage entity) { SysMessageVo vo = MapstructUtils.convert(entity, SysMessageVo.class); vo.setData(parseData(entity.getDataJson())); return vo; } + /** + * 判断消息是否需要存入消息盒子 + * 仅系统消息、通知消息需要存入 + * + * @param payload 消息推送体 + * @return 是否支持存入消息盒子 + */ private boolean supportsMessageBox(PushPayloadDTO payload) { if (payload == null) { return false; } + // 仅消息/通知类型需要存入,排除LLM大模型消息 if (StringUtils.equalsAny(payload.getType(), PushTypeEnum.MESSAGE.getType(), PushTypeEnum.NOTICE.getType())) { return !StringUtils.equalsAny(payload.getType(), PushTypeEnum.LLM.getType()) && !StringUtils.equalsAny(payload.getSource(), PushSourceEnum.LLM.getSource()); @@ -154,6 +280,12 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService return false; } + /** + * 根据消息类型/来源自动解析消息分类 + * + * @param payload 消息推送体 + * @return 消息分类(system/notice/workflow) + */ private String resolveCategory(PushPayloadDTO payload) { if (StringUtils.equalsAny(payload.getType(), PushTypeEnum.NOTICE.getType()) || StringUtils.equalsAny(payload.getSource(), PushSourceEnum.NOTICE.getSource())) { @@ -165,6 +297,12 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService return CATEGORY_SYSTEM; } + /** + * 根据消息分类自动生成消息标题 + * + * @param payload 消息推送体 + * @return 消息标题 + */ private String resolveTitle(PushPayloadDTO payload) { return switch (resolveCategory(payload)) { case CATEGORY_NOTICE -> "通知公告消息"; @@ -173,6 +311,12 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService }; } + /** + * 解析消息内容(从data中提取noticeContent) + * + * @param payload 消息推送体 + * @return 消息内容 + */ private String resolveContent(PushPayloadDTO payload) { Object data = payload.getData(); if (data instanceof Map map) { @@ -181,6 +325,12 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService return null; } + /** + * 解析JSON数据字符串为对象 + * + * @param dataJson JSON字符串 + * @return 解析后对象 + */ private Object parseData(String dataJson) { if (StringUtils.isBlank(dataJson)) { return null; @@ -188,6 +338,12 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService return JsonUtils.parseObject(dataJson, Object.class); } + /** + * 构建默认格式的消息体 + * + * @param message 消息内容 + * @return 默认消息体 + */ private PushPayloadDTO buildDefaultMessage(String message) { return PushPayloadDTO.of(PushTypeEnum.MESSAGE, PushSourceEnum.BACKEND, message, null); }