update 增加消息推送模块注释

This commit is contained in:
AprilWind
2026-03-30 14:36:54 +08:00
parent 8300f65640
commit 920d717cd0
14 changed files with 566 additions and 27 deletions

View File

@@ -11,17 +11,56 @@ import java.util.List;
*/
public interface MessageService {
/**
* 发送指定用户文本消息
*
* @param userId 目标用户ID
* @param message 文本消息内容
*/
void sendMessage(Long userId, String message);
/**
* 全局广播文本消息
*
* @param message 文本消息内容
*/
void sendMessage(String message);
/**
* 发送指定用户自定义消息体
*
* @param userId 目标用户ID
* @param payload 消息推送体
*/
void sendMessage(Long userId, PushPayloadDTO payload);
/**
* 全局广播自定义消息体
*
* @param payload 消息推送体
*/
void sendMessage(PushPayloadDTO payload);
/**
* 批量发布消息给指定用户列表
*
* @param userIds 用户ID集合
* @param payload 消息推送体
*/
void publishMessage(List<Long> userIds, PushPayloadDTO payload);
/**
* 发布全局广播文本消息
*
* @param message 文本消息内容
*/
void publishAll(String message);
/**
* 发布全局广播自定义消息体
*
* @param payload 消息推送体
*/
void publishAll(PushPayloadDTO payload);
}

View File

@@ -12,7 +12,7 @@
<artifactId>ruoyi-common-push</artifactId>
<description>
ruoyi-common-push 模块
ruoyi-common-push 消息推送模块
</description>
<dependencies>

View File

@@ -16,16 +16,36 @@ import org.springframework.context.annotation.Bean;
@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "sse", matchIfMissing = true)
public class MessageSseConfiguration {
/**
* 注册 SSE 会话管理器
* 负责管理用户 SSE 连接、消息发送、会话清理
*
* @return SseEmitterSessionManager 实例
*/
@Bean
public SseEmitterSessionManager sseEmitterManager() {
return new SseEmitterSessionManager();
}
/**
* 注册消息主题监听器
* 监听 Redis 全局消息,用于集群环境下的消息分发
*
* @param manager SSE 会话管理器
* @return MessageTopicListener 实例
*/
@Bean
public MessageTopicListener messageTopicListener(SseEmitterSessionManager manager) {
return new MessageTopicListener(manager);
}
/**
* 注册 SSE 控制器
* 提供前端建立 SSE 连接的接口
*
* @param manager SSE 会话管理器
* @return SseController 实例
*/
@Bean
public SseController sseController(SseEmitterSessionManager manager) {
return new SseController(manager);

View File

@@ -23,6 +23,10 @@ import org.springframework.web.socket.server.HandshakeInterceptor;
@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "websocket")
public class MessageWebSocketConfiguration {
/**
* WebSocket 配置注册
* 配置连接路径、拦截器、跨域
*/
@Bean
public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor,
WebSocketHandler webSocketHandler,
@@ -33,21 +37,37 @@ public class MessageWebSocketConfiguration {
.setAllowedOrigins(messageProperties.getAllowedOrigins());
}
/**
* WebSocket 会话管理器
* 负责连接管理、消息发送、定时清理失效会话
*/
@Bean
public WebSocketSessionManager webSocketSessionManager() {
return new WebSocketSessionManager();
}
/**
* WebSocket 握手拦截器
* 建立连接前做登录校验、客户端ID校验
*/
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new PlusWebSocketInterceptor();
}
/**
* WebSocket 消息处理器
* 处理连接、消息、心跳、断开、异常等事件
*/
@Bean
public WebSocketHandler webSocketHandler(WebSocketSessionManager webSocketSessionManager) {
return new PlusWebSocketHandler(webSocketSessionManager);
}
/**
* 消息主题监听器
* 订阅 Redis 消息,实现集群环境下的消息分发
*/
@Bean
public MessageTopicListener messageTopicListener(WebSocketSessionManager webSocketSessionManager) {
return new MessageTopicListener(webSocketSessionManager);

View File

@@ -7,13 +7,28 @@ package org.dromara.common.push.constant;
*/
public interface MessageConstants {
/**
* 登录用户信息
*/
String LOGIN_USER_KEY = "loginUser";
/**
* 登录令牌
*/
String LOGIN_TOKEN_KEY = "token";
/**
* 全局消息订阅主题
*/
String MESSAGE_TOPIC = "global:message";
/**
* 心跳请求标识
*/
String PING = "ping";
/**
* 心跳响应标识
*/
String PONG = "pong";
}

View File

@@ -12,13 +12,40 @@ import java.util.function.Consumer;
*/
public interface PushSessionManager {
/**
* 订阅消息通道
* 注册消息消费者,用于监听并处理消息推送事件
*
* @param consumer 消息消费逻辑
*/
void subscribeMessage(Consumer<PushDTO> consumer);
/**
* 发送消息给指定用户
*
* @param userId 目标用户ID
* @param payload 消息体
*/
void sendMessage(Long userId, PushPayloadDTO payload);
/**
* 全局广播消息(所有在线用户)
*
* @param payload 消息体
*/
void sendMessage(PushPayloadDTO payload);
/**
* 批量发布消息给指定用户列表
*
* @param pushDTO 推送参数封装对象
*/
void publishMessage(PushDTO pushDTO);
/**
* 全局广播消息(所有用户)
*
* @param payload 消息体
*/
void publishAll(PushPayloadDTO payload);
}

View File

@@ -33,20 +33,44 @@ import static org.dromara.common.push.constant.MessageConstants.MESSAGE_TOPIC;
@Slf4j
public class WebSocketSessionManager implements PushSessionManager {
/**
* 用户会话存储集合
* 结构userId -> (token -> WebSocketSession)
* 支持同一用户多终端、多设备同时在线
*/
private static final Map<Long, Map<String, WebSocketSession>> USER_TOKEN_SESSIONS = new ConcurrentHashMap<>();
/**
* 构造函数
* 初始化定时任务每60秒执行一次会话监控自动清理无效连接
*/
public WebSocketSessionManager() {
SpringUtils.getBean(ScheduledExecutorService.class)
.scheduleWithFixedDelay(this::sessionMonitor, 60L, 60L, TimeUnit.SECONDS);
}
/**
* 用户建立WebSocket连接
*
* @param userId 用户ID
* @param token 客户端唯一标识(区分不同设备/终端)
* @param session WebSocket会话对象
*/
public void connect(Long userId, String token, WebSocketSession session) {
Map<String, WebSocketSession> sessions = USER_TOKEN_SESSIONS.computeIfAbsent(userId, key -> new ConcurrentHashMap<>());
// 移除并关闭旧的同token会话避免重复连接
WebSocketSession oldSession = sessions.remove(token);
closeSession(oldSession, CloseStatus.NORMAL);
// 存储新会话
sessions.put(token, session);
}
/**
* 用户断开WebSocket连接
*
* @param userId 用户ID
* @param token 客户端唯一标识
*/
public void disconnect(Long userId, String token) {
if (userId == null || token == null) {
return;
@@ -56,12 +80,18 @@ public class WebSocketSessionManager implements PushSessionManager {
USER_TOKEN_SESSIONS.remove(userId);
return;
}
// 移除指定token会话并关闭
closeSession(sessions.remove(token), CloseStatus.NORMAL);
// 该用户无任何会话时,从缓存中移除
if (sessions.isEmpty()) {
USER_TOKEN_SESSIONS.remove(userId);
}
}
/**
* 会话监控定时任务
* 定期清理已关闭、失效的WebSocket会话防止内存泄漏
*/
public void sessionMonitor() {
List<Long> toRemoveUsers = new ArrayList<>();
USER_TOKEN_SESSIONS.forEach((userId, sessionMap) -> {
@@ -69,6 +99,7 @@ public class WebSocketSessionManager implements PushSessionManager {
toRemoveUsers.add(userId);
return;
}
// 移除已关闭的无效会话
sessionMap.entrySet().removeIf(entry -> {
WebSocketSession session = entry.getValue();
if (session == null || !session.isOpen()) {
@@ -77,18 +108,32 @@ public class WebSocketSessionManager implements PushSessionManager {
}
return false;
});
// 无有效会话,标记用户待删除
if (sessionMap.isEmpty()) {
toRemoveUsers.add(userId);
}
});
// 批量清理无会话用户
toRemoveUsers.forEach(USER_TOKEN_SESSIONS::remove);
}
/**
* 订阅消息通道
* 注册消息消费者监听Redis消息推送
*
* @param consumer 消息消费逻辑
*/
@Override
public void subscribeMessage(Consumer<PushDTO> consumer) {
RedisUtils.subscribe(MESSAGE_TOPIC, PushDTO.class, consumer);
}
/**
* 向指定用户发送消息
*
* @param userId 目标用户ID
* @param payload 消息体
*/
@Override
public void sendMessage(Long userId, PushPayloadDTO payload) {
if (payload == null) {
@@ -99,24 +144,38 @@ public class WebSocketSessionManager implements PushSessionManager {
USER_TOKEN_SESSIONS.remove(userId);
return;
}
// 发送消息并自动清理失效会话
sessions.entrySet().removeIf(entry -> {
WebSocketSession session = entry.getValue();
if (session == null || !session.isOpen()) {
closeSession(session, CloseStatus.NORMAL);
return true;
}
// 发送失败的会话也会被移除
return !sendMessage(session, new TextMessage(JsonUtils.toJsonString(payload)));
});
// 无有效会话则移除用户
if (sessions.isEmpty()) {
USER_TOKEN_SESSIONS.remove(userId);
}
}
/**
* 向所有在线用户广播消息
*
* @param payload 消息体
*/
@Override
public void sendMessage(PushPayloadDTO payload) {
USER_TOKEN_SESSIONS.keySet().forEach(userId -> sendMessage(userId, payload));
}
/**
* 发布消息到Redis订阅通道
* 支持集群环境下的分布式消息推送
*
* @param pushDTO 推送消息封装对象
*/
@Override
public void publishMessage(PushDTO pushDTO) {
RedisUtils.publish(MESSAGE_TOPIC, pushDTO, consumer -> log.info(
@@ -127,6 +186,11 @@ public class WebSocketSessionManager implements PushSessionManager {
));
}
/**
* 全局广播消息(所有用户)
*
* @param payload 消息体
*/
@Override
public void publishAll(PushPayloadDTO payload) {
PushDTO dto = new PushDTO();
@@ -134,14 +198,33 @@ public class WebSocketSessionManager implements PushSessionManager {
publishMessage(dto);
}
/**
* 发送心跳Pong消息
* 用于维持WebSocket长连接存活
*
* @param session WebSocket会话
*/
public void sendPongMessage(WebSocketSession session) {
sendMessage(session, new PongMessage());
}
/**
* 发送文本消息
*
* @param session WebSocket会话
* @param message 文本内容
*/
public void sendMessage(WebSocketSession session, String message) {
sendMessage(session, new TextMessage(message));
}
/**
* 底层消息发送方法
*
* @param session 会话对象
* @param message WebSocket消息对象
* @return 发送是否成功
*/
private boolean sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
if (session == null || !session.isOpen()) {
log.warn("[send] session会话已经关闭");
@@ -156,6 +239,12 @@ public class WebSocketSessionManager implements PushSessionManager {
}
}
/**
* 安全关闭WebSocket会话
*
* @param session 待关闭的会话
* @param status 关闭状态码
*/
private void closeSession(WebSocketSession session, CloseStatus status) {
if (session == null) {
return;
@@ -163,6 +252,7 @@ public class WebSocketSessionManager implements PushSessionManager {
try {
session.close(status);
} catch (Exception ignored) {
// 关闭异常忽略,防止影响主流程
}
}
}

View File

@@ -14,15 +14,40 @@ import java.util.Arrays;
@AllArgsConstructor
public enum MessageTransportEnum {
/**
* SSE 传输方式
* 服务端推送事件,单向轻量传输
*/
SSE("sse"),
/**
* WebSocket 传输方式
* 全双工长连接,支持双向实时通信
*/
WEBSOCKET("websocket");
/**
* 传输类型编码
*/
private final String code;
/**
* 判断传输方式是否匹配
*
* @param transport 传输方式字符串
* @return 是否匹配
*/
public boolean matches(String transport) {
return code.equalsIgnoreCase(transport);
}
/**
* 根据传输类型字符串获取枚举
* 找不到则默认返回 SSE
*
* @param transport 传输方式字符串
* @return 对应的消息传输枚举
*/
public static MessageTransportEnum of(String transport) {
return Arrays.stream(values())
.filter(item -> item.matches(transport))

View File

@@ -22,7 +22,8 @@ import java.io.IOException;
import java.util.List;
/**
* WebSocket Handler。
* WebSocket 请求处理器
* 处理WebSocket连接建立、消息接收、异常、断开等全生命周期事件
*
* @author Lion Li
*/
@@ -30,17 +31,31 @@ import java.util.List;
@Slf4j
public class PlusWebSocketHandler extends AbstractWebSocketHandler {
/**
* WebSocket 会话管理器
*/
private final WebSocketSessionManager webSocketSessionManager;
/**
* 建立WebSocket连接后触发
* 校验用户登录信息,注册会话
*
* @param session WebSocket会话
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws IOException {
// 从会话属性中获取登录用户信息和Token
LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY);
String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY);
// 校验用户信息是否为空,无效则直接关闭连接
if (ObjectUtil.hasNull(loginUser, token)) {
session.close(CloseStatus.BAD_DATA);
log.info("[connect] invalid token received. sessionId: {}", session.getId());
return;
}
// 并发安全包装会话,并注册到会话管理器
webSocketSessionManager.connect(
loginUser.getUserId(),
token,
@@ -49,16 +64,27 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler {
log.info("[connect] sessionId: {}, userId:{}, token:{}", session.getId(), loginUser.getUserId(), token);
}
/**
* 处理客户端发送的文本消息
* 支持心跳ping/pong以及自定义消息转发
*
* @param session WebSocket会话
* @param message 文本消息
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY);
if (ObjectUtil.isNull(loginUser)) {
return;
}
// 心跳处理客户端发送ping服务端回复pong
if (MessageConstants.PING.equalsIgnoreCase(message.getPayload())) {
webSocketSessionManager.sendMessage(session, MessageConstants.PONG);
return;
}
// 构建客户端自定义消息并发布
PushDTO dto = new PushDTO();
dto.setUserIds(List.of(loginUser.getUserId()));
dto.setPayload(PushPayloadDTO.of(
@@ -70,33 +96,55 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler {
webSocketSessionManager.publishMessage(dto);
}
/**
* 处理二进制消息(默认实现)
*/
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
super.handleBinaryMessage(session, message);
}
/**
* 处理Pong心跳响应
* 维持长连接存活
*/
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) {
webSocketSessionManager.sendPongMessage(session);
}
/**
* 传输异常处理
* 记录异常日志
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
log.error("[transport error] sessionId: {}, exception:{}", session.getId(), exception.getMessage());
}
/**
* 连接关闭后触发
* 注销用户会话
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY);
String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY);
if (ObjectUtil.hasNull(loginUser, token)) {
log.info("[disconnect] invalid token received. sessionId: {}", session.getId());
return;
}
// 从会话管理器中移除连接
webSocketSessionManager.disconnect(loginUser.getUserId(), token);
log.info("[disconnect] sessionId: {}, userId:{}, token:{}", session.getId(), loginUser.getUserId(), token);
}
/**
* 是否支持分片消息
* 关闭:不支持分片传输
*/
@Override
public boolean supportsPartialMessages() {
return false;

View File

@@ -19,14 +19,31 @@ import java.util.List;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class PushHelper {
/**
* 发送指定用户文本消息
*
* @param userId 目标用户ID
* @param message 文本消息内容
*/
public static void sendMessage(Long userId, String message) {
sendMessage(userId, buildMessage(message));
}
/**
* 全局广播文本消息
*
* @param message 文本消息内容
*/
public static void sendMessage(String message) {
sendMessage(buildMessage(message));
}
/**
* 发送指定用户自定义消息体
*
* @param userId 目标用户ID
* @param payload 消息推送体
*/
public static void sendMessage(Long userId, PushPayloadDTO payload) {
if (!isEnabled()) {
return;
@@ -34,6 +51,11 @@ public class PushHelper {
getSessionManager().sendMessage(userId, payload);
}
/**
* 全局广播自定义消息体
*
* @param payload 消息推送体
*/
public static void sendMessage(PushPayloadDTO payload) {
if (!isEnabled()) {
return;
@@ -41,6 +63,12 @@ public class PushHelper {
getSessionManager().sendMessage(payload);
}
/**
* 批量发布消息给指定用户列表
*
* @param userIds 用户ID集合
* @param payload 消息推送体
*/
public static void publishMessage(List<Long> userIds, PushPayloadDTO payload) {
PushDTO dto = new PushDTO();
dto.setUserIds(userIds);
@@ -48,6 +76,11 @@ public class PushHelper {
publishMessage(dto);
}
/**
* 批量发布消息使用完整推送DTO
*
* @param dto 推送参数封装对象
*/
public static void publishMessage(PushDTO dto) {
if (!isEnabled() || dto == null || dto.getPayload() == null) {
return;
@@ -55,10 +88,20 @@ public class PushHelper {
getSessionManager().publishMessage(dto);
}
/**
* 发布全局广播文本消息
*
* @param message 文本消息内容
*/
public static void publishAll(String message) {
publishAll(buildMessage(message));
}
/**
* 发布全局广播自定义消息体
*
* @param payload 消息推送体
*/
public static void publishAll(PushPayloadDTO payload) {
if (!isEnabled()) {
return;
@@ -66,15 +109,33 @@ public class PushHelper {
getSessionManager().publishAll(payload);
}
/**
* 判断消息推送功能是否开启
* 读取配置message.enabled
*
* @return 是否开启推送
*/
public static boolean isEnabled() {
return Boolean.TRUE.equals(SpringUtils.getProperty("message.enabled", Boolean.class, Boolean.TRUE));
}
/**
* 获取推送会话管理器Bean
*
* @return PushSessionManager 实例
*/
private static PushSessionManager getSessionManager() {
return SpringUtils.getBean(PushSessionManager.class);
}
/**
* 构建默认格式的消息推送体
*
* @param message 消息内容
* @return 封装好的 PushPayloadDTO
*/
private static PushPayloadDTO buildMessage(String message) {
return PushPayloadDTO.of(PushTypeEnum.MESSAGE, PushSourceEnum.BACKEND, message, null);
}
}

View File

@@ -14,6 +14,7 @@ import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* WebSocket 握手拦截器。
*
@@ -22,36 +23,54 @@ import java.util.Map;
@Slf4j
public class PlusWebSocketInterceptor implements HandshakeInterceptor {
/**
* 握手前拦截(核心认证逻辑)
* 校验登录状态、Token、客户端ID认证通过才允许建立 WebSocket 连接
*
* @param attributes 用于传递到 WebSocketSession 的属性集合
* @return 是否允许握手true=允许false=拒绝)
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) {
try {
// 1. 获取当前登录用户与 Token
LoginUser loginUser = LoginHelper.getLoginUser();
String tokenValue = StpUtil.getTokenValue();
// 2. 未登录直接拒绝握手
if (loginUser == null || StringUtils.isBlank(tokenValue)) {
return false;
}
// 3. 校验客户端ID防止多端冒用
String headerCid = ServletUtils.getRequest().getHeader(LoginHelper.CLIENT_KEY);
String paramCid = ServletUtils.getParameter(LoginHelper.CLIENT_KEY);
String clientId = StpUtil.getExtra(LoginHelper.CLIENT_KEY).toString();
// 客户端ID必须与请求头/参数中的一致,否则拒绝连接
if (!StringUtils.equalsAny(clientId, headerCid, paramCid)) {
throw NotLoginException.newInstance(StpUtil.getLoginType(),
"-100", "客户端ID与Token不匹配",
StpUtil.getTokenValue());
}
// 4. 认证通过,将用户信息存入会话属性,供后续 WebSocketHandler 使用
attributes.put(MessageConstants.LOGIN_USER_KEY, loginUser);
attributes.put(MessageConstants.LOGIN_TOKEN_KEY, tokenValue);
return true;
} catch (NotLoginException e) {
// 认证失败,记录日志并拒绝连接
log.error("WebSocket 认证失败'{}',无法访问系统资源", e.getMessage());
return false;
}
}
/**
* 握手完成后触发
* 此处无需处理,留空即可
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}

View File

@@ -17,10 +17,20 @@ import org.springframework.core.Ordered;
@RequiredArgsConstructor
public class MessageTopicListener implements ApplicationRunner, Ordered {
/**
* 推送会话管理器
*/
private final PushSessionManager pushSessionManager;
/**
* 项目启动后执行
* 注册消息订阅,监听消息并分发给对应用户/全局广播
*
* @param args 启动参数
*/
@Override
public void run(ApplicationArguments args) {
// 订阅消息主题,处理消息分发
pushSessionManager.subscribeMessage(message -> {
log.info("消息主题订阅收到消息userIds={} message={}",
message.getUserIds(),
@@ -28,15 +38,22 @@ public class MessageTopicListener implements ApplicationRunner, Ordered {
if (message.getPayload() == null) {
return;
}
// 有指定用户 -> 单发
if (CollUtil.isNotEmpty(message.getUserIds())) {
message.getUserIds().forEach(userId -> pushSessionManager.sendMessage(userId, message.getPayload()));
} else {
// 无指定用户 -> 全局广播
pushSessionManager.sendMessage(message.getPayload());
}
});
log.info("初始化消息主题订阅监听器成功");
}
/**
* 执行顺序,优先级设为最高,确保消息订阅最先初始化
*
* @return 优先级,值越小越先执行
*/
@Override
public int getOrder() {
return -1;

View File

@@ -14,6 +14,7 @@ public interface ISysMessageService {
/**
* 查询当前用户消息盒子数据
* 按系统消息、通知公告、工作流消息分类返回
*
* @param userId 用户ID
* @return 消息盒子数据
@@ -23,69 +24,70 @@ public interface ISysMessageService {
/**
* 发送指定用户文本消息
*
* @param userId 目标用户
* @param message 文本消息
* @param userId 目标用户ID
* @param message 文本消息内容
*/
void sendMessage(Long userId, String message);
/**
* 广播文本消息
* 全局广播文本消息
*
* @param message 文本消息
* @param message 文本消息内容
*/
void sendMessage(String message);
/**
* 发送指定用户消息
* 发送指定用户自定义消息
*
* @param userId 目标用户
* @param payload 推送消息体
* @param userId 目标用户ID
* @param payload 消息推送
*/
void sendMessage(Long userId, PushPayloadDTO payload);
/**
* 广播消息
* 全局广播自定义消息
*
* @param payload 推送消息体
* @param payload 消息推送
*/
void sendMessage(PushPayloadDTO payload);
/**
* 发布指定用户消息
* 批量发布消息给指定用户列表
*
* @param userIds 用户ID列表
* @param payload 推送消息体
* @param userIds 用户ID集合
* @param payload 消息推送
*/
void publishMessage(List<Long> userIds, PushPayloadDTO payload);
/**
* 发布广播文本消息
* 发布全局广播文本消息
*
* @param message 文本消息
* @param message 文本消息内容
*/
void publishAll(String message);
/**
* 发布广播消息
* 发布全局广播自定义消息
*
* @param payload 推送消息体
* @param payload 消息推送
*/
void publishAll(PushPayloadDTO payload);
/**
* 记录全局消息
* 存储全局广播消息到数据库
*
* @param payload 推送消息体
* @return 回填消息ID后的推送消息体
* @param payload 消息推送
* @return 回填消息ID后的消息体
*/
PushPayloadDTO storeAll(PushPayloadDTO payload);
/**
* 记录指定用户消息
* 存储指定用户消息到数据库
*
* @param userIds 用户ID列表
* @param payload 推送消息体
* @return 回填消息ID后的推送消息体
* @param userIds 用户ID集合
* @param payload 消息推送
* @return 回填消息ID后的消息体
*/
PushPayloadDTO storeUsers(List<Long> userIds, PushPayloadDTO payload);
}

View File

@@ -37,15 +37,45 @@ import java.util.concurrent.TimeUnit;
@Service
public class SysMessageServiceImpl implements ISysMessageService, MessageService {
/**
* 全局广播用户标识(所有用户可见)
*/
private static final String GLOBAL_USER_IDS = "0";
/**
* 消息分类:系统消息
*/
private static final String CATEGORY_SYSTEM = "system";
/**
* 消息分类:通知公告
*/
private static final String CATEGORY_NOTICE = "notice";
/**
* 消息分类:工作流
*/
private static final String CATEGORY_WORKFLOW = "workflow";
/**
* 消息盒子每页展示最大条数
*/
private static final int BOX_LIMIT = 100;
/**
* 消息盒子展示消息天数仅展示30天内
*/
private static final long BOX_DAYS = 30L;
private final SysMessageMapper baseMapper;
/**
* 查询当前用户消息盒子数据
* 按系统消息、通知公告、工作流消息分类返回
*
* @param userId 用户ID
* @return 分类消息盒子数据
*/
@Override
public SysMessageBoxVo queryMessageBox(Long userId) {
SysMessageBoxVo box = new SysMessageBoxVo();
@@ -55,51 +85,110 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService
return box;
}
/**
* 发送指定用户文本消息
*
* @param userId 目标用户ID
* @param message 文本消息内容
*/
@Override
public void sendMessage(Long userId, String message) {
PushHelper.sendMessage(userId, buildDefaultMessage(message));
}
/**
* 全局广播文本消息
*
* @param message 文本消息内容
*/
@Override
public void sendMessage(String message) {
PushHelper.sendMessage(buildDefaultMessage(message));
}
/**
* 发送指定用户自定义消息体
*
* @param userId 目标用户ID
* @param payload 消息推送体
*/
@Override
public void sendMessage(Long userId, PushPayloadDTO payload) {
PushHelper.sendMessage(userId, payload);
}
/**
* 全局广播自定义消息体
*
* @param payload 消息推送体
*/
@Override
public void sendMessage(PushPayloadDTO payload) {
PushHelper.sendMessage(payload);
}
/**
* 批量发布消息给指定用户列表
*
* @param userIds 用户ID集合
* @param payload 消息推送体
*/
@Override
public void publishMessage(List<Long> userIds, PushPayloadDTO payload) {
PushHelper.publishMessage(userIds, storeUsers(userIds, payload));
}
/**
* 发布全局广播文本消息
*
* @param message 文本消息内容
*/
@Override
public void publishAll(String message) {
publishAll(buildDefaultMessage(message));
}
/**
* 发布全局广播自定义消息体
*
* @param payload 消息推送体
*/
@Override
public void publishAll(PushPayloadDTO payload) {
PushHelper.publishAll(storeAll(payload));
}
/**
* 存储全局广播消息到数据库
*
* @param payload 消息推送体
* @return 回填消息ID后的消息体
*/
@Override
public PushPayloadDTO storeAll(PushPayloadDTO payload) {
return storeMessage(null, payload);
}
/**
* 存储指定用户消息到数据库
*
* @param userIds 用户ID集合
* @param payload 消息推送体
* @return 回填消息ID后的消息体
*/
@Override
public PushPayloadDTO storeUsers(List<Long> userIds, PushPayloadDTO payload) {
return storeMessage(userIds, payload);
}
/**
* 统一消息存储逻辑
* 判断是否需要存入消息盒子,需要则插入数据库
*
* @param userIds 用户ID集合为null则全局广播
* @param payload 消息推送体
* @return 回填消息ID后的消息体
*/
private PushPayloadDTO storeMessage(List<Long> userIds, PushPayloadDTO payload) {
if (!supportsMessageBox(payload)) {
return payload;
@@ -110,20 +199,42 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService
return payload;
}
/**
* 根据分类和用户ID查询消息列表
* 仅查询30天内、最多100条、按时间倒序
*
* @param category 消息分类
* @param userId 用户ID
* @return 消息VO列表
*/
private List<SysMessageVo> selectMessageList(String category, Long userId) {
LambdaQueryWrapper<SysMessage> lqw = Wrappers.lambdaQuery();
// 分类匹配
lqw.eq(SysMessage::getCategory, category);
// 仅查询30天内消息
lqw.ge(SysMessage::getCreateTime, new Date(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(BOX_DAYS)));
// 全局消息 或 当前用户在接收人范围内
lqw.and(wrapper -> wrapper.eq(SysMessage::getSendUserIds, GLOBAL_USER_IDS)
.or()
.apply(DataBaseHelper.findInSet(userId, "send_user_ids")));
// 按创建时间+消息ID倒序
lqw.orderByDesc(SysMessage::getCreateTime, SysMessage::getMessageId);
// 分页查询只查第一页最多100条
List<SysMessage> list = baseMapper.selectList(new Page<>(1, BOX_LIMIT, false), lqw);
// 转换为VO并返回
return list.stream().map(this::buildVo).toList();
}
/**
* 构建消息实体(用于数据库存储)
*
* @param userIds 接收用户ID集合
* @param payload 消息推送体
* @return 系统消息实体
*/
private SysMessage buildMessage(List<Long> userIds, PushPayloadDTO payload) {
SysMessage message = new SysMessage();
// 设置消息ID无则自动生成
message.setMessageId(payload.getMessageId() == null ? IdGeneratorUtil.nextLongId() : payload.getMessageId());
message.setCategory(resolveCategory(payload));
message.setType(payload.getType());
@@ -133,20 +244,35 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService
message.setContent(resolveContent(payload));
message.setDataJson(JsonUtils.toJsonString(payload.getData()));
message.setPath(payload.getPath());
// 设置接收人(无则为全局广播)
message.setSendUserIds(CollUtil.isEmpty(userIds) ? GLOBAL_USER_IDS : StringUtils.joinComma(userIds));
return message;
}
/**
* 消息实体转换为展示VO
*
* @param entity 消息实体
* @return 消息展示VO
*/
private SysMessageVo buildVo(SysMessage entity) {
SysMessageVo vo = MapstructUtils.convert(entity, SysMessageVo.class);
vo.setData(parseData(entity.getDataJson()));
return vo;
}
/**
* 判断消息是否需要存入消息盒子
* 仅系统消息、通知消息需要存入
*
* @param payload 消息推送体
* @return 是否支持存入消息盒子
*/
private boolean supportsMessageBox(PushPayloadDTO payload) {
if (payload == null) {
return false;
}
// 仅消息/通知类型需要存入排除LLM大模型消息
if (StringUtils.equalsAny(payload.getType(), PushTypeEnum.MESSAGE.getType(), PushTypeEnum.NOTICE.getType())) {
return !StringUtils.equalsAny(payload.getType(), PushTypeEnum.LLM.getType())
&& !StringUtils.equalsAny(payload.getSource(), PushSourceEnum.LLM.getSource());
@@ -154,6 +280,12 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService
return false;
}
/**
* 根据消息类型/来源自动解析消息分类
*
* @param payload 消息推送体
* @return 消息分类system/notice/workflow
*/
private String resolveCategory(PushPayloadDTO payload) {
if (StringUtils.equalsAny(payload.getType(), PushTypeEnum.NOTICE.getType())
|| StringUtils.equalsAny(payload.getSource(), PushSourceEnum.NOTICE.getSource())) {
@@ -165,6 +297,12 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService
return CATEGORY_SYSTEM;
}
/**
* 根据消息分类自动生成消息标题
*
* @param payload 消息推送体
* @return 消息标题
*/
private String resolveTitle(PushPayloadDTO payload) {
return switch (resolveCategory(payload)) {
case CATEGORY_NOTICE -> "通知公告消息";
@@ -173,6 +311,12 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService
};
}
/**
* 解析消息内容从data中提取noticeContent
*
* @param payload 消息推送体
* @return 消息内容
*/
private String resolveContent(PushPayloadDTO payload) {
Object data = payload.getData();
if (data instanceof Map<?, ?> map) {
@@ -181,6 +325,12 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService
return null;
}
/**
* 解析JSON数据字符串为对象
*
* @param dataJson JSON字符串
* @return 解析后对象
*/
private Object parseData(String dataJson) {
if (StringUtils.isBlank(dataJson)) {
return null;
@@ -188,6 +338,12 @@ public class SysMessageServiceImpl implements ISysMessageService, MessageService
return JsonUtils.parseObject(dataJson, Object.class);
}
/**
* 构建默认格式的消息体
*
* @param message 消息内容
* @return 默认消息体
*/
private PushPayloadDTO buildDefaultMessage(String message) {
return PushPayloadDTO.of(PushTypeEnum.MESSAGE, PushSourceEnum.BACKEND, message, null);
}