update 消息推送增加 消息类型 消息来源 前端跳转路径等扩展参数

This commit is contained in:
疯狂的狮子Li
2026-03-26 15:34:02 +08:00
parent fa8e1cd3c0
commit 40011e9acd
13 changed files with 443 additions and 9 deletions

View File

@@ -0,0 +1,83 @@
package org.dromara.common.core.domain.dto;
import lombok.Data;
import org.dromara.common.core.enums.PushSourceEnum;
import org.dromara.common.core.enums.PushTypeEnum;
import org.dromara.common.core.utils.StringUtils;
import java.io.Serial;
import java.io.Serializable;
import java.util.Map;
/**
* 推送给前端的统一消息体
*
* @author Lion Li
*/
@Data
public class PushPayload implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 消息类型
*/
private String type;
/**
* 消息来源
*/
private String source;
/**
* 文本消息
*/
private String message;
/**
* 扩展数据
*/
private Object data;
/**
* 前端跳转路径
*/
private String path;
/**
* 前端跳转参数
*/
private Map<String, Object> query;
/**
* 时间戳
*/
private Long timestamp;
public static PushPayload of(String type, String source, String message, Object data) {
PushPayload payload = new PushPayload();
payload.setType(StringUtils.defaultIfBlank(type, PushTypeEnum.MESSAGE.getType()));
payload.setSource(StringUtils.defaultIfBlank(source, PushSourceEnum.BACKEND.getSource()));
payload.setMessage(message);
payload.setData(data);
payload.setTimestamp(System.currentTimeMillis());
return payload;
}
public static PushPayload of(PushTypeEnum type, PushSourceEnum source, String message, Object data) {
return of(
type == null ? null : type.getType(),
source == null ? null : source.getSource(),
message,
data
);
}
public static PushPayload of(PushTypeEnum type, PushSourceEnum source, String message, Object data, String path, Map<String, Object> query) {
PushPayload payload = of(type, source, message, data);
payload.setPath(path);
payload.setQuery(query);
return payload;
}
}

View File

@@ -0,0 +1,41 @@
package org.dromara.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 推送消息来源枚举
*
* @author Lion Li
*/
@Getter
@AllArgsConstructor
public enum PushSourceEnum {
/**
* 后端系统消息
*/
BACKEND("backend"),
/**
* 通知公告
*/
NOTICE("notice"),
/**
* 工作流
*/
WORKFLOW("workflow"),
/**
* 大模型
*/
LLM("llm"),
/**
* 客户端消息
*/
CLIENT("client");
private final String source;
}

View File

@@ -0,0 +1,36 @@
package org.dromara.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 推送消息类型枚举
*
* @author Lion Li
*/
@Getter
@AllArgsConstructor
public enum PushTypeEnum {
/**
* 通用消息
*/
MESSAGE("message"),
/**
* 通知公告
*/
NOTICE("notice"),
/**
* 大模型消息
*/
LLM("llm"),
/**
* 自定义消息
*/
CUSTOM("custom");
private final String type;
}

View File

@@ -3,7 +3,9 @@ package org.dromara.common.sse.core;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.dto.PushPayload;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.sse.dto.SseMessageDTO;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -188,6 +190,26 @@ public class SseEmitterManager {
}
}
/**
* 向指定用户的全部本地 SSE 会话发送统一 JSON 消息。
*
* @param userId 要发送消息的用户id
* @param payload 要发送的消息体
*/
public void sendMessage(Long userId, PushPayload payload) {
sendMessage(userId, JsonUtils.toJsonString(payload));
}
/**
* 向指定用户的全部本地 SSE 会话发送统一 JSON 消息。
*
* @param userId 要发送消息的用户id
* @param sseMessageDTO 要发送的消息内容
*/
public void sendMessage(Long userId, SseMessageDTO sseMessageDTO) {
sendMessage(userId, buildPayload(sseMessageDTO));
}
/**
* 向当前节点所有 SSE 会话发送消息。
*
@@ -199,6 +221,15 @@ public class SseEmitterManager {
}
}
/**
* 向当前节点所有 SSE 会话发送统一 JSON 消息。
*
* @param payload 要发送的消息体
*/
public void sendMessage(PushPayload payload) {
sendMessage(JsonUtils.toJsonString(payload));
}
/**
* 发布 SSE 订阅消息。
*
@@ -226,4 +257,35 @@ public class SseEmitterManager {
log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message);
});
}
/**
* 发布 SSE 广播 JSON 消息。
*
* @param payload 要发布的消息体
*/
public void publishAll(PushPayload payload) {
SseMessageDTO broadcastMessage = new SseMessageDTO();
broadcastMessage.setMessage(payload.getMessage());
broadcastMessage.setMessageType(payload.getType());
broadcastMessage.setMessageSource(payload.getSource());
broadcastMessage.setData(payload.getData());
broadcastMessage.setPath(payload.getPath());
broadcastMessage.setQuery(payload.getQuery());
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
log.info("SSE发送主题订阅消息topic:{} type:{} source:{} message:{}",
SSE_TOPIC, payload.getType(), payload.getSource(), payload.getMessage());
});
}
private String buildPayload(SseMessageDTO sseMessageDTO) {
PushPayload payload = PushPayload.of(
sseMessageDTO.getMessageType(),
sseMessageDTO.getMessageSource(),
sseMessageDTO.getMessage(),
sseMessageDTO.getData()
);
payload.setPath(sseMessageDTO.getPath());
payload.setQuery(sseMessageDTO.getQuery());
return JsonUtils.toJsonString(payload);
}
}

View File

@@ -5,6 +5,7 @@ import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* 消息的dto
@@ -26,4 +27,29 @@ public class SseMessageDTO implements Serializable {
* 需要发送的消息
*/
private String message;
/**
* 消息类型
*/
private String messageType;
/**
* 消息来源
*/
private String messageSource;
/**
* 扩展数据
*/
private Object data;
/**
* 前端跳转路径
*/
private String path;
/**
* 前端跳转参数
*/
private Map<String, Object> query;
}

View File

@@ -32,10 +32,17 @@ public class SseTopicListener implements ApplicationRunner, Ordered {
// 如果key不为空就按照key发消息 如果为空就群发
if (CollUtil.isNotEmpty(message.getUserIds())) {
message.getUserIds().forEach(key -> {
sseEmitterManager.sendMessage(key, message.getMessage());
sseEmitterManager.sendMessage(key, message);
});
} else {
sseEmitterManager.sendMessage(message.getMessage());
sseEmitterManager.sendMessage(
org.dromara.common.core.domain.dto.PushPayload.of(
message.getMessageType(),
message.getMessageSource(),
message.getMessage(),
message.getData()
)
);
}
});
log.info("初始化SSE主题订阅监听器成功");

View File

@@ -3,10 +3,15 @@ package org.dromara.common.sse.utils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.dto.PushPayload;
import org.dromara.common.core.enums.PushSourceEnum;
import org.dromara.common.core.enums.PushTypeEnum;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.sse.core.SseEmitterManager;
import org.dromara.common.sse.dto.SseMessageDTO;
import java.util.List;
/**
* SSE工具类
*
@@ -35,7 +40,7 @@ public class SseMessageUtils {
if (!isEnable()) {
return;
}
MANAGER.sendMessage(userId, message);
MANAGER.sendMessage(userId, buildMessage(message));
}
/**
@@ -47,7 +52,32 @@ public class SseMessageUtils {
if (!isEnable()) {
return;
}
MANAGER.sendMessage(message);
MANAGER.sendMessage(buildMessage(message));
}
/**
* 向指定用户的 SSE 会话发送统一 JSON 消息。
*
* @param userId 要发送消息的用户id
* @param payload 要发送的消息体
*/
public static void sendMessage(Long userId, PushPayload payload) {
if (!isEnable()) {
return;
}
MANAGER.sendMessage(userId, payload);
}
/**
* 向当前节点上的所有 SSE 会话发送统一 JSON 消息。
*
* @param payload 要发送的消息体
*/
public static void sendMessage(PushPayload payload) {
if (!isEnable()) {
return;
}
MANAGER.sendMessage(payload);
}
/**
@@ -71,7 +101,40 @@ public class SseMessageUtils {
if (!isEnable()) {
return;
}
MANAGER.publishAll(message);
MANAGER.publishAll(buildMessage(message));
}
/**
* 向指定用户发布统一 JSON 消息。
*
* @param userIds 目标用户
* @param payload 消息体
*/
public static void publishMessage(List<Long> userIds, PushPayload payload) {
if (!isEnable()) {
return;
}
SseMessageDTO dto = new SseMessageDTO();
dto.setUserIds(userIds);
dto.setMessage(payload.getMessage());
dto.setMessageType(payload.getType());
dto.setMessageSource(payload.getSource());
dto.setData(payload.getData());
dto.setPath(payload.getPath());
dto.setQuery(payload.getQuery());
MANAGER.publishMessage(dto);
}
/**
* 向所有用户发布统一 JSON 消息。
*
* @param payload 消息体
*/
public static void publishAll(PushPayload payload) {
if (!isEnable()) {
return;
}
MANAGER.publishAll(payload);
}
/**
@@ -83,4 +146,8 @@ public class SseMessageUtils {
return SSE_ENABLE;
}
private static PushPayload buildMessage(String message) {
return PushPayload.of(PushTypeEnum.MESSAGE, PushSourceEnum.BACKEND, message, null);
}
}

View File

@@ -5,6 +5,7 @@ import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* 消息的dto
@@ -26,4 +27,29 @@ public class WebSocketMessageDTO implements Serializable {
* 需要发送的消息
*/
private String message;
/**
* 消息类型
*/
private String messageType;
/**
* 消息来源
*/
private String messageSource;
/**
* 扩展数据
*/
private Object data;
/**
* 前端跳转路径
*/
private String path;
/**
* 前端跳转参数
*/
private Map<String, Object> query;
}

View File

@@ -2,6 +2,8 @@ package org.dromara.common.websocket.handler;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.enums.PushSourceEnum;
import org.dromara.common.core.enums.PushTypeEnum;
import org.dromara.common.core.domain.model.LoginUser;
import org.dromara.common.websocket.dto.WebSocketMessageDTO;
import org.dromara.common.websocket.holder.WebSocketSessionHolder;
@@ -54,6 +56,8 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler {
WebSocketMessageDTO messageDTO = new WebSocketMessageDTO();
messageDTO.setSessionKeys(List.of(loginUser.getUserId()));
messageDTO.setMessage(message.getPayload());
messageDTO.setMessageType(PushTypeEnum.CUSTOM.getType());
messageDTO.setMessageSource(PushSourceEnum.CLIENT.getSource());
WebSocketUtils.publishMessage(messageDTO);
}

View File

@@ -31,12 +31,12 @@ public class WebSocketTopicListener implements ApplicationRunner, Ordered {
if (CollUtil.isNotEmpty(message.getSessionKeys())) {
message.getSessionKeys().forEach(key -> {
if (WebSocketSessionHolder.existSession(key)) {
WebSocketUtils.sendMessage(key, message.getMessage());
WebSocketUtils.sendMessage(key, message);
}
});
} else {
WebSocketSessionHolder.getSessionsAll().forEach(key -> {
WebSocketUtils.sendMessage(key, message.getMessage());
WebSocketUtils.sendMessage(key, message);
});
}
});

View File

@@ -4,6 +4,8 @@ import cn.hutool.core.collection.CollUtil;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.dto.PushPayload;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.websocket.dto.WebSocketMessageDTO;
import org.dromara.common.websocket.holder.WebSocketSessionHolder;
@@ -39,6 +41,17 @@ public class WebSocketUtils {
sendMessage(session, message);
}
/**
* 向指定会话标识发送统一 JSON 消息。
*
* @param sessionKey 要发送消息的用户id
* @param webSocketMessage 要发送的消息内容
*/
public static void sendMessage(Long sessionKey, WebSocketMessageDTO webSocketMessage) {
WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
sendMessage(session, buildPayload(webSocketMessage));
}
/**
* 订阅 WebSocket 广播主题消息。
*
@@ -58,7 +71,7 @@ public class WebSocketUtils {
// 当前服务内session,直接发送消息
for (Long sessionKey : webSocketMessage.getSessionKeys()) {
if (WebSocketSessionHolder.existSession(sessionKey)) {
WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage());
WebSocketUtils.sendMessage(sessionKey, webSocketMessage);
continue;
}
unsentSessionKeys.add(sessionKey);
@@ -88,6 +101,43 @@ public class WebSocketUtils {
});
}
/**
* 向指定会话标识发布统一 JSON 消息。
*
* @param sessionKeys 目标会话
* @param payload 消息体
*/
public static void publishMessage(List<Long> sessionKeys, PushPayload payload) {
WebSocketMessageDTO dto = new WebSocketMessageDTO();
dto.setSessionKeys(sessionKeys);
dto.setMessage(payload.getMessage());
dto.setMessageType(payload.getType());
dto.setMessageSource(payload.getSource());
dto.setData(payload.getData());
dto.setPath(payload.getPath());
dto.setQuery(payload.getQuery());
publishMessage(dto);
}
/**
* 向所有 WebSocket 会话发布统一 JSON 消息。
*
* @param payload 消息体
*/
public static void publishAll(PushPayload payload) {
WebSocketMessageDTO broadcastMessage = new WebSocketMessageDTO();
broadcastMessage.setMessage(payload.getMessage());
broadcastMessage.setMessageType(payload.getType());
broadcastMessage.setMessageSource(payload.getSource());
broadcastMessage.setData(payload.getData());
broadcastMessage.setPath(payload.getPath());
broadcastMessage.setQuery(payload.getQuery());
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info("WebSocket发送主题订阅消息topic:{} type:{} source:{} message:{}",
WEB_SOCKET_TOPIC, payload.getType(), payload.getSource(), payload.getMessage());
});
}
/**
* 向指定会话发送 Pong 心跳消息。
*
@@ -107,6 +157,18 @@ public class WebSocketUtils {
sendMessage(session, new TextMessage(message));
}
private static String buildPayload(WebSocketMessageDTO webSocketMessage) {
PushPayload payload = PushPayload.of(
webSocketMessage.getMessageType(),
webSocketMessage.getMessageSource(),
webSocketMessage.getMessage(),
webSocketMessage.getData()
);
payload.setPath(webSocketMessage.getPath());
payload.setQuery(webSocketMessage.getQuery());
return JsonUtils.toJsonString(payload);
}
/**
* 向指定 WebSocket 会话发送原始消息对象。
*

View File

@@ -4,11 +4,14 @@ import cn.dev33.satoken.annotation.SaCheckPermission;
import lombok.RequiredArgsConstructor;
import org.dromara.common.core.domain.PageResult;
import org.dromara.common.core.domain.R;
import org.dromara.common.core.enums.PushSourceEnum;
import org.dromara.common.core.enums.PushTypeEnum;
import org.dromara.common.core.service.DictService;
import org.dromara.common.log.annotation.Log;
import org.dromara.common.log.enums.BusinessType;
import org.dromara.common.mybatis.core.page.PageQuery;
import org.dromara.common.redis.annotation.RepeatSubmit;
import org.dromara.common.sse.dto.SseMessageDTO;
import org.dromara.common.sse.utils.SseMessageUtils;
import org.dromara.common.web.core.BaseController;
import org.dromara.system.domain.bo.SysNoticeBo;
@@ -17,6 +20,9 @@ import org.dromara.system.service.ISysNoticeService;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
/**
* 公告 信息操作处理
*
@@ -72,7 +78,17 @@ public class SysNoticeController extends BaseController {
return R.fail();
}
String type = dictService.getDictLabel("sys_notice_type", notice.getNoticeType());
SseMessageUtils.publishAll("[" + type + "] " + notice.getNoticeTitle());
Map<String, Object> data = new HashMap<>(4);
data.put("noticeType", notice.getNoticeType());
data.put("noticeTypeLabel", type);
data.put("noticeTitle", notice.getNoticeTitle());
SseMessageDTO dto = new SseMessageDTO();
dto.setMessage("[" + type + "] " + notice.getNoticeTitle());
dto.setMessageType(PushTypeEnum.NOTICE.getType());
dto.setMessageSource(PushSourceEnum.NOTICE.getSource());
dto.setData(data);
dto.setPath("/system/notice");
SseMessageUtils.publishMessage(dto);
return R.ok();
}

View File

@@ -5,6 +5,8 @@ import cn.hutool.core.util.ObjectUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.dto.UserDTO;
import org.dromara.common.core.enums.PushSourceEnum;
import org.dromara.common.core.enums.PushTypeEnum;
import org.dromara.common.core.exception.ServiceException;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.core.utils.StreamUtils;
@@ -95,6 +97,8 @@ public class FlwCommonServiceImpl implements IFlwCommonService {
SseMessageDTO dto = new SseMessageDTO();
dto.setUserIds(userIds);
dto.setMessage(message);
dto.setMessageType(PushTypeEnum.MESSAGE.getType());
dto.setMessageSource(PushSourceEnum.WORKFLOW.getSource());
SseMessageUtils.publishMessage(dto);
}
case EMAIL_MESSAGE -> MailUtils.sendText(emails, subject, message);