diff --git a/ruoyi-admin/src/main/java/org/dromara/web/controller/AuthController.java b/ruoyi-admin/src/main/java/org/dromara/web/controller/AuthController.java index fc524645a..f5fdfac2b 100644 --- a/ruoyi-admin/src/main/java/org/dromara/web/controller/AuthController.java +++ b/ruoyi-admin/src/main/java/org/dromara/web/controller/AuthController.java @@ -12,9 +12,12 @@ import me.zhyd.oauth.request.AuthRequest; import me.zhyd.oauth.utils.AuthStateUtils; import org.dromara.common.core.constant.SystemConstants; import org.dromara.common.core.domain.R; +import org.dromara.common.core.domain.dto.PushPayload; import org.dromara.common.core.domain.model.LoginBody; import org.dromara.common.core.domain.model.RegisterBody; import org.dromara.common.core.domain.model.SocialLoginBody; +import org.dromara.common.core.enums.PushSourceEnum; +import org.dromara.common.core.enums.PushTypeEnum; import org.dromara.common.core.utils.DateUtils; import org.dromara.common.core.utils.MessageUtils; import org.dromara.common.core.utils.StringUtils; @@ -23,12 +26,11 @@ import org.dromara.common.encrypt.annotation.ApiEncrypt; import org.dromara.common.json.utils.JsonUtils; import org.dromara.common.redis.annotation.RateLimiter; import org.dromara.common.redis.enums.LimitType; +import org.dromara.common.push.helper.PushHelper; import org.dromara.common.satoken.utils.LoginHelper; import org.dromara.common.social.config.properties.SocialLoginConfigProperties; import org.dromara.common.social.config.properties.SocialProperties; import org.dromara.common.social.utils.SocialUtils; -import org.dromara.common.sse.dto.SseMessageDTO; -import org.dromara.common.sse.utils.SseMessageUtils; import org.dromara.system.domain.vo.SysClientVo; import org.dromara.system.service.ISysClientService; import org.dromara.system.service.ISysConfigService; @@ -93,10 +95,15 @@ public class AuthController { Long userId = LoginHelper.getUserId(); scheduledExecutorService.schedule(() -> { - SseMessageDTO dto = new SseMessageDTO(); - dto.setUserIds(List.of(userId)); - dto.setMessage(DateUtils.getTodayHour(new Date()) + "好,欢迎登录 RuoYi-Vue-Plus 后台管理系统"); - SseMessageUtils.publishMessage(dto); + PushHelper.publishMessage( + List.of(userId), + PushPayload.of( + PushTypeEnum.MESSAGE, + PushSourceEnum.BACKEND, + DateUtils.getTodayHour(new Date()) + "好,欢迎登录 RuoYi-Vue-Plus 后台管理系统", + null + ) + ); }, 5, TimeUnit.SECONDS); return R.ok(loginVo); } diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml index b58c97a42..8f437e925 100644 --- a/ruoyi-admin/src/main/resources/application.yml +++ b/ruoyi-admin/src/main/resources/application.yml @@ -213,18 +213,14 @@ management: logfile: external-file: ./logs/sys-console.log ---- # 默认/推荐使用sse推送 -sse: +--- # 统一消息推送配置 +message: enabled: true - path: /resource/sse - ---- # websocket -websocket: - # 如果关闭 需要和前端开关一起关闭 - enabled: false - # 路径 - path: /resource/websocket - # 设置访问源地址 + # sse / websocket + transport: sse + # 统一访问路径 + path: /resource/message + # websocket 允许的跨域来源 allowedOrigins: '*' --- # warm-flow工作流配置 diff --git a/ruoyi-common/pom.xml b/ruoyi-common/pom.xml index 8d779e122..f6a30c47f 100644 --- a/ruoyi-common/pom.xml +++ b/ruoyi-common/pom.xml @@ -35,8 +35,7 @@ ruoyi-common-sensitive ruoyi-common-json ruoyi-common-encrypt - ruoyi-common-websocket - ruoyi-common-sse + ruoyi-common-push ruoyi-common-mqtt diff --git a/ruoyi-common/ruoyi-common-bom/pom.xml b/ruoyi-common/ruoyi-common-bom/pom.xml index 4f025fdc4..9eb04dd77 100644 --- a/ruoyi-common/ruoyi-common-bom/pom.xml +++ b/ruoyi-common/ruoyi-common-bom/pom.xml @@ -144,17 +144,10 @@ ${revision} - + org.dromara - ruoyi-common-websocket - ${revision} - - - - - org.dromara - ruoyi-common-sse + ruoyi-common-push ${revision} diff --git a/ruoyi-common/ruoyi-common-websocket/pom.xml b/ruoyi-common/ruoyi-common-push/pom.xml similarity index 94% rename from ruoyi-common/ruoyi-common-websocket/pom.xml rename to ruoyi-common/ruoyi-common-push/pom.xml index 0587cd79a..4d888e05f 100644 --- a/ruoyi-common/ruoyi-common-websocket/pom.xml +++ b/ruoyi-common/ruoyi-common-push/pom.xml @@ -9,10 +9,10 @@ 4.0.0 - ruoyi-common-websocket + ruoyi-common-push - ruoyi-common-websocket 模块 + ruoyi-common-push 模块 diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageAutoConfiguration.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageAutoConfiguration.java new file mode 100644 index 000000000..7ea031130 --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageAutoConfiguration.java @@ -0,0 +1,17 @@ +package org.dromara.common.push.config; + +import org.dromara.common.push.properties.MessageProperties; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +/** + * 统一消息推送公共自动装配。 + * + * @author Lion Li + */ +@AutoConfiguration +@ConditionalOnProperty(prefix = "message", name = "enabled", havingValue = "true", matchIfMissing = true) +@EnableConfigurationProperties(MessageProperties.class) +public class MessageAutoConfiguration { +} diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java new file mode 100644 index 000000000..02b9d78ff --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java @@ -0,0 +1,33 @@ +package org.dromara.common.push.config; + +import org.dromara.common.push.controller.SseController; +import org.dromara.common.push.core.SseEmitterSessionManager; +import org.dromara.common.push.listener.MessageTopicListener; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; + +/** + * SSE 消息推送自动装配。 + * + * @author Lion Li + */ +@AutoConfiguration(after = MessageAutoConfiguration.class) +@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "sse", matchIfMissing = true) +public class MessageSseConfiguration { + + @Bean + public SseEmitterSessionManager sseEmitterManager() { + return new SseEmitterSessionManager(); + } + + @Bean + public MessageTopicListener messageTopicListener(SseEmitterSessionManager manager) { + return new MessageTopicListener(manager); + } + + @Bean + public SseController sseController(SseEmitterSessionManager manager) { + return new SseController(manager); + } +} diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java new file mode 100644 index 000000000..b1bb6ec2e --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java @@ -0,0 +1,55 @@ +package org.dromara.common.push.config; + +import org.dromara.common.push.listener.MessageTopicListener; +import org.dromara.common.push.core.WebSocketSessionManager; +import org.dromara.common.push.handler.PlusWebSocketHandler; +import org.dromara.common.push.interceptor.PlusWebSocketInterceptor; +import org.dromara.common.push.properties.MessageProperties; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.server.HandshakeInterceptor; + +/** + * WebSocket 消息推送自动装配。 + * + * @author Lion Li + */ +@EnableWebSocket +@AutoConfiguration(after = MessageAutoConfiguration.class) +@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "websocket") +public class MessageWebSocketConfiguration { + + @Bean + public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, + WebSocketHandler webSocketHandler, + MessageProperties messageProperties) { + return registry -> registry + .addHandler(webSocketHandler, messageProperties.getPath()) + .addInterceptors(handshakeInterceptor) + .setAllowedOrigins(messageProperties.getAllowedOrigins()); + } + + @Bean + public WebSocketSessionManager webSocketSessionManager() { + return new WebSocketSessionManager(); + } + + @Bean + public HandshakeInterceptor handshakeInterceptor() { + return new PlusWebSocketInterceptor(); + } + + @Bean + public WebSocketHandler webSocketHandler(WebSocketSessionManager webSocketSessionManager) { + return new PlusWebSocketHandler(webSocketSessionManager); + } + + @Bean + public MessageTopicListener messageTopicListener(WebSocketSessionManager webSocketSessionManager) { + return new MessageTopicListener(webSocketSessionManager); + } +} diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/constant/MessageConstants.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/constant/MessageConstants.java new file mode 100644 index 000000000..3f66957af --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/constant/MessageConstants.java @@ -0,0 +1,19 @@ +package org.dromara.common.push.constant; + +/** + * 模块通用消息常量定义。 + * + * @author Lion Li + */ +public interface MessageConstants { + + String LOGIN_USER_KEY = "loginUser"; + + String LOGIN_TOKEN_KEY = "token"; + + String MESSAGE_TOPIC = "global:message"; + + String PING = "ping"; + + String PONG = "pong"; +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/controller/SseController.java similarity index 71% rename from ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java rename to ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/controller/SseController.java index 2967d6736..ce42857a8 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/controller/SseController.java @@ -1,13 +1,12 @@ -package org.dromara.common.sse.controller; +package org.dromara.common.push.controller; import cn.dev33.satoken.annotation.SaIgnore; import cn.dev33.satoken.stp.StpUtil; import lombok.RequiredArgsConstructor; import org.dromara.common.core.domain.R; +import org.dromara.common.push.core.SseEmitterSessionManager; import org.dromara.common.satoken.utils.LoginHelper; -import org.dromara.common.sse.core.SseEmitterManager; import org.springframework.beans.factory.DisposableBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @@ -19,25 +18,24 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; * @author Lion Li */ @RestController -@ConditionalOnProperty(value = "sse.enabled", havingValue = "true") @RequiredArgsConstructor public class SseController implements DisposableBean { - private final SseEmitterManager sseEmitterManager; + private final SseEmitterSessionManager sessionManager; /** * 建立当前登录用户的 SSE 连接。 * * @return SSE 发射器 */ - @GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @GetMapping(value = "${message.path:/resource/message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter connect() { if (!StpUtil.isLogin()) { return null; } String tokenValue = StpUtil.getTokenValue(); Long userId = LoginHelper.getUserId(); - return sseEmitterManager.connect(userId, tokenValue); + return sessionManager.connect(userId, tokenValue); } /** @@ -46,11 +44,11 @@ public class SseController implements DisposableBean { * @return 操作结果 */ @SaIgnore - @GetMapping(value = "${sse.path}/close") + @GetMapping(value = "${message.path:/resource/message}/close") public R close() { String tokenValue = StpUtil.getTokenValue(); Long userId = LoginHelper.getUserId(); - sseEmitterManager.disconnect(userId, tokenValue); + sessionManager.disconnect(userId, tokenValue); return R.ok(); } @@ -61,12 +59,12 @@ public class SseController implements DisposableBean { // * @param userId 目标用户的 ID // * @param msg 要发送的消息内容 // */ -// @GetMapping(value = "${sse.path}/send") +// @GetMapping(value = "${message.path:/resource/message}/send") // public R send(Long userId, String msg) { -// SseMessageDTO dto = new SseMessageDTO(); +// PushDTO dto = new PushDTO(); // dto.setUserIds(List.of(userId)); -// dto.setMessage(msg); -// sseEmitterManager.publishMessage(dto); +// dto.setPayload(PushPayload.of("message", "backend", msg, null)); +// sessionManager.publishMessage(dto); // return R.ok(); // } // @@ -75,9 +73,9 @@ public class SseController implements DisposableBean { // * // * @param msg 要发送的消息内容 // */ -// @GetMapping(value = "${sse.path}/sendAll") +// @GetMapping(value = "${message.path:/resource/message}/sendAll") // public R send(String msg) { -// sseEmitterManager.publishAll(msg); +// sessionManager.publishAll(msg); // return R.ok(); // } diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/PushSessionManager.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/PushSessionManager.java new file mode 100644 index 000000000..384a83ec7 --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/PushSessionManager.java @@ -0,0 +1,24 @@ +package org.dromara.common.push.core; + +import org.dromara.common.core.domain.dto.PushPayload; +import org.dromara.common.push.dto.PushDTO; + +import java.util.function.Consumer; + +/** + * 统一推送会话管理器。 + * + * @author Lion Li + */ +public interface PushSessionManager { + + void subscribeMessage(Consumer consumer); + + void sendMessage(Long userId, PushPayload payload); + + void sendMessage(PushPayload payload); + + void publishMessage(PushDTO pushDTO); + + void publishAll(PushPayload payload); +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/SseEmitterSessionManager.java similarity index 75% rename from ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java rename to ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/SseEmitterSessionManager.java index 0540096a1..042e0759d 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/SseEmitterSessionManager.java @@ -1,13 +1,14 @@ -package org.dromara.common.sse.core; +package org.dromara.common.push.core; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.map.MapUtil; import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.domain.dto.PushPayload; +import org.dromara.common.push.constant.MessageConstants; +import org.dromara.common.push.dto.PushDTO; import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.json.utils.JsonUtils; import org.dromara.common.redis.utils.RedisUtils; -import org.dromara.common.sse.dto.SseMessageDTO; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; @@ -25,16 +26,11 @@ import java.util.function.Consumer; * @author Lion Li */ @Slf4j -public class SseEmitterManager { - - /** - * 订阅的频道 - */ - private final static String SSE_TOPIC = "global:sse"; +public class SseEmitterSessionManager implements PushSessionManager { private final static Map> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); - public SseEmitterManager() { + public SseEmitterSessionManager() { // 定时执行 SSE 心跳检测 SpringUtils.getBean(ScheduledExecutorService.class) .scheduleWithFixedDelay(this::sseMonitor, 60L, 60L, TimeUnit.SECONDS); @@ -160,8 +156,9 @@ public class SseEmitterManager { * * @param consumer 处理SSE消息的消费者函数 */ - public void subscribeMessage(Consumer consumer) { - RedisUtils.subscribe(SSE_TOPIC, SseMessageDTO.class, consumer); + @Override + public void subscribeMessage(Consumer consumer) { + RedisUtils.subscribe(MessageConstants.MESSAGE_TOPIC, PushDTO.class, consumer); } /** @@ -196,6 +193,7 @@ public class SseEmitterManager { * @param userId 要发送消息的用户id * @param payload 要发送的消息体 */ + @Override public void sendMessage(Long userId, PushPayload payload) { sendMessage(userId, JsonUtils.toJsonString(payload)); } @@ -204,10 +202,13 @@ public class SseEmitterManager { * 向指定用户的全部本地 SSE 会话发送统一 JSON 消息。 * * @param userId 要发送消息的用户id - * @param sseMessageDTO 要发送的消息内容 + * @param pushDTO 要发送的消息内容 */ - public void sendMessage(Long userId, SseMessageDTO sseMessageDTO) { - sendMessage(userId, buildPayload(sseMessageDTO)); + public void sendMessage(Long userId, PushDTO pushDTO) { + if (pushDTO == null) { + return; + } + sendMessage(userId, pushDTO.getPayload()); } /** @@ -226,6 +227,7 @@ public class SseEmitterManager { * * @param payload 要发送的消息体 */ + @Override public void sendMessage(PushPayload payload) { sendMessage(JsonUtils.toJsonString(payload)); } @@ -233,16 +235,16 @@ public class SseEmitterManager { /** * 发布 SSE 订阅消息。 * - * @param sseMessageDTO 要发布的SSE消息对象 + * @param pushDTO 要发布的SSE消息对象 */ - public void publishMessage(SseMessageDTO sseMessageDTO) { - SseMessageDTO broadcastMessage = new SseMessageDTO(); - broadcastMessage.setUserIds(sseMessageDTO.getUserIds()); - broadcastMessage.setMessage(sseMessageDTO.getMessage()); - RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { - log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", - SSE_TOPIC, sseMessageDTO.getUserIds(), sseMessageDTO.getMessage()); - }); + @Override + public void publishMessage(PushDTO pushDTO) { + RedisUtils.publish(MessageConstants.MESSAGE_TOPIC, pushDTO, consumer -> log.info( + "发送主题订阅消息topic:{} userIds:{} message:{}", + MessageConstants.MESSAGE_TOPIC, + pushDTO.getUserIds(), + pushDTO.getPayload() == null ? null : pushDTO.getPayload().getMessage() + )); } /** @@ -251,11 +253,7 @@ public class SseEmitterManager { * @param message 要发布的消息内容 */ public void publishAll(String message) { - SseMessageDTO broadcastMessage = new SseMessageDTO(); - broadcastMessage.setMessage(message); - RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { - log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message); - }); + publishAll(PushPayload.of("message", "backend", message, null)); } /** @@ -263,29 +261,13 @@ public class SseEmitterManager { * * @param payload 要发布的消息体 */ + @Override public void publishAll(PushPayload payload) { - SseMessageDTO broadcastMessage = new SseMessageDTO(); - broadcastMessage.setMessage(payload.getMessage()); - broadcastMessage.setMessageType(payload.getType()); - broadcastMessage.setMessageSource(payload.getSource()); - broadcastMessage.setData(payload.getData()); - broadcastMessage.setPath(payload.getPath()); - broadcastMessage.setQuery(payload.getQuery()); - RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { - log.info("SSE发送主题订阅消息topic:{} type:{} source:{} message:{}", - SSE_TOPIC, payload.getType(), payload.getSource(), payload.getMessage()); + PushDTO dto = new PushDTO(); + dto.setPayload(payload); + RedisUtils.publish(MessageConstants.MESSAGE_TOPIC, dto, consumer -> { + log.info("发送主题订阅消息topic:{} type:{} source:{} message:{}", + MessageConstants.MESSAGE_TOPIC, payload.getType(), payload.getSource(), payload.getMessage()); }); } - - private String buildPayload(SseMessageDTO sseMessageDTO) { - PushPayload payload = PushPayload.of( - sseMessageDTO.getMessageType(), - sseMessageDTO.getMessageSource(), - sseMessageDTO.getMessage(), - sseMessageDTO.getData() - ); - payload.setPath(sseMessageDTO.getPath()); - payload.setQuery(sseMessageDTO.getQuery()); - return JsonUtils.toJsonString(payload); - } } diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java new file mode 100644 index 000000000..30f34df79 --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java @@ -0,0 +1,168 @@ +package org.dromara.common.push.core; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.map.MapUtil; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.domain.dto.PushPayload; +import org.dromara.common.core.utils.SpringUtils; +import org.dromara.common.json.utils.JsonUtils; +import org.dromara.common.push.dto.PushDTO; +import org.dromara.common.redis.utils.RedisUtils; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.PongMessage; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.dromara.common.push.constant.MessageConstants.MESSAGE_TOPIC; + +/** + * WebSocket 会话管理器。 + * + * @author Lion Li + */ +@Slf4j +public class WebSocketSessionManager implements PushSessionManager { + + private static final Map> USER_TOKEN_SESSIONS = new ConcurrentHashMap<>(); + + public WebSocketSessionManager() { + SpringUtils.getBean(ScheduledExecutorService.class) + .scheduleWithFixedDelay(this::sessionMonitor, 60L, 60L, TimeUnit.SECONDS); + } + + public void connect(Long userId, String token, WebSocketSession session) { + Map sessions = USER_TOKEN_SESSIONS.computeIfAbsent(userId, key -> new ConcurrentHashMap<>()); + WebSocketSession oldSession = sessions.remove(token); + closeSession(oldSession, CloseStatus.NORMAL); + sessions.put(token, session); + } + + public void disconnect(Long userId, String token) { + if (userId == null || token == null) { + return; + } + Map sessions = USER_TOKEN_SESSIONS.get(userId); + if (MapUtil.isEmpty(sessions)) { + USER_TOKEN_SESSIONS.remove(userId); + return; + } + closeSession(sessions.remove(token), CloseStatus.NORMAL); + if (sessions.isEmpty()) { + USER_TOKEN_SESSIONS.remove(userId); + } + } + + public void sessionMonitor() { + List toRemoveUsers = new ArrayList<>(); + USER_TOKEN_SESSIONS.forEach((userId, sessionMap) -> { + if (CollUtil.isEmpty(sessionMap)) { + toRemoveUsers.add(userId); + return; + } + sessionMap.entrySet().removeIf(entry -> { + WebSocketSession session = entry.getValue(); + if (session == null || !session.isOpen()) { + closeSession(session, CloseStatus.NORMAL); + return true; + } + return false; + }); + if (sessionMap.isEmpty()) { + toRemoveUsers.add(userId); + } + }); + toRemoveUsers.forEach(USER_TOKEN_SESSIONS::remove); + } + + @Override + public void subscribeMessage(Consumer consumer) { + RedisUtils.subscribe(MESSAGE_TOPIC, PushDTO.class, consumer); + } + + @Override + public void sendMessage(Long userId, PushPayload payload) { + if (payload == null) { + return; + } + Map sessions = USER_TOKEN_SESSIONS.get(userId); + if (MapUtil.isEmpty(sessions)) { + USER_TOKEN_SESSIONS.remove(userId); + return; + } + sessions.entrySet().removeIf(entry -> { + WebSocketSession session = entry.getValue(); + if (session == null || !session.isOpen()) { + closeSession(session, CloseStatus.NORMAL); + return true; + } + return !sendMessage(session, new TextMessage(JsonUtils.toJsonString(payload))); + }); + if (sessions.isEmpty()) { + USER_TOKEN_SESSIONS.remove(userId); + } + } + + @Override + public void sendMessage(PushPayload payload) { + USER_TOKEN_SESSIONS.keySet().forEach(userId -> sendMessage(userId, payload)); + } + + @Override + public void publishMessage(PushDTO pushDTO) { + RedisUtils.publish(MESSAGE_TOPIC, pushDTO, consumer -> log.info( + "WebSocket发送主题订阅消息topic:{} userIds:{} message:{}", + MESSAGE_TOPIC, + pushDTO.getUserIds(), + pushDTO.getPayload() == null ? null : pushDTO.getPayload().getMessage() + )); + } + + @Override + public void publishAll(PushPayload payload) { + PushDTO dto = new PushDTO(); + dto.setPayload(payload); + publishMessage(dto); + } + + public void sendPongMessage(WebSocketSession session) { + sendMessage(session, new PongMessage()); + } + + public void sendMessage(WebSocketSession session, String message) { + sendMessage(session, new TextMessage(message)); + } + + private boolean sendMessage(WebSocketSession session, WebSocketMessage message) { + if (session == null || !session.isOpen()) { + log.warn("[send] session会话已经关闭"); + return false; + } + try { + session.sendMessage(message); + return true; + } catch (IOException e) { + log.error("[send] session({}) 发送消息({}) 异常", session, message, e); + return false; + } + } + + private void closeSession(WebSocketSession session, CloseStatus status) { + if (session == null) { + return; + } + try { + session.close(status); + } catch (Exception ignored) { + } + } +} diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/dto/PushDTO.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/dto/PushDTO.java new file mode 100644 index 000000000..8a31df0b5 --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/dto/PushDTO.java @@ -0,0 +1,30 @@ +package org.dromara.common.push.dto; + +import lombok.Data; +import org.dromara.common.core.domain.dto.PushPayload; + +import java.io.Serial; +import java.io.Serializable; +import java.util.List; + +/** + * 统一推送 DTO。 + * + * @author Lion Li + */ +@Data +public class PushDTO implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** + * 目标用户 ID 列表,为空表示广播。 + */ + private List userIds; + + /** + * 推送消息体。 + */ + private PushPayload payload; +} diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/enums/MessageTransportEnum.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/enums/MessageTransportEnum.java new file mode 100644 index 000000000..ee5660e1b --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/enums/MessageTransportEnum.java @@ -0,0 +1,32 @@ +package org.dromara.common.push.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.Arrays; + +/** + * 消息推送传输方式。 + * + * @author Lion Li + */ +@Getter +@AllArgsConstructor +public enum MessageTransportEnum { + + SSE("sse"), + WEBSOCKET("websocket"); + + private final String code; + + public boolean matches(String transport) { + return code.equalsIgnoreCase(transport); + } + + public static MessageTransportEnum of(String transport) { + return Arrays.stream(values()) + .filter(item -> item.matches(transport)) + .findFirst() + .orElse(SSE); + } +} diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java new file mode 100644 index 000000000..f704c9245 --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java @@ -0,0 +1,104 @@ +package org.dromara.common.push.handler; + +import cn.hutool.core.util.ObjectUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.domain.dto.PushPayload; +import org.dromara.common.core.domain.model.LoginUser; +import org.dromara.common.core.enums.PushSourceEnum; +import org.dromara.common.core.enums.PushTypeEnum; +import org.dromara.common.push.constant.MessageConstants; +import org.dromara.common.push.core.WebSocketSessionManager; +import org.dromara.common.push.dto.PushDTO; +import org.springframework.web.socket.BinaryMessage; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.PongMessage; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.AbstractWebSocketHandler; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; + +import java.io.IOException; +import java.util.List; + +/** + * WebSocket Handler。 + * + * @author Lion Li + */ +@RequiredArgsConstructor +@Slf4j +public class PlusWebSocketHandler extends AbstractWebSocketHandler { + + private final WebSocketSessionManager webSocketSessionManager; + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws IOException { + LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY); + String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY); + if (ObjectUtil.hasNull(loginUser, token)) { + session.close(CloseStatus.BAD_DATA); + log.info("[connect] invalid token received. sessionId: {}", session.getId()); + return; + } + webSocketSessionManager.connect( + loginUser.getUserId(), + token, + new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64_000) + ); + log.info("[connect] sessionId: {}, userId:{}, token:{}", session.getId(), loginUser.getUserId(), token); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) { + LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY); + if (ObjectUtil.isNull(loginUser)) { + return; + } + if (MessageConstants.PING.equalsIgnoreCase(message.getPayload())) { + webSocketSessionManager.sendMessage(session, MessageConstants.PONG); + return; + } + PushDTO dto = new PushDTO(); + dto.setUserIds(List.of(loginUser.getUserId())); + dto.setPayload(PushPayload.of( + PushTypeEnum.CUSTOM, + PushSourceEnum.CLIENT, + message.getPayload(), + null + )); + webSocketSessionManager.publishMessage(dto); + } + + @Override + protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { + super.handleBinaryMessage(session, message); + } + + @Override + protected void handlePongMessage(WebSocketSession session, PongMessage message) { + webSocketSessionManager.sendPongMessage(session); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) { + log.error("[transport error] sessionId: {}, exception:{}", session.getId(), exception.getMessage()); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { + LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY); + String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY); + if (ObjectUtil.hasNull(loginUser, token)) { + log.info("[disconnect] invalid token received. sessionId: {}", session.getId()); + return; + } + webSocketSessionManager.disconnect(loginUser.getUserId(), token); + log.info("[disconnect] sessionId: {}, userId:{}, token:{}", session.getId(), loginUser.getUserId(), token); + } + + @Override + public boolean supportsPartialMessages() { + return false; + } +} diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java new file mode 100644 index 000000000..dedf5e462 --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java @@ -0,0 +1,80 @@ +package org.dromara.common.push.helper; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.dromara.common.core.domain.dto.PushPayload; +import org.dromara.common.core.enums.PushSourceEnum; +import org.dromara.common.core.enums.PushTypeEnum; +import org.dromara.common.core.utils.SpringUtils; +import org.dromara.common.push.core.PushSessionManager; +import org.dromara.common.push.dto.PushDTO; + +import java.util.List; + +/** + * 统一消息推送工具。 + * + * @author Lion Li + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class PushHelper { + + public static void sendMessage(Long userId, String message) { + sendMessage(userId, buildMessage(message)); + } + + public static void sendMessage(String message) { + sendMessage(buildMessage(message)); + } + + public static void sendMessage(Long userId, PushPayload payload) { + if (!isEnabled()) { + return; + } + getSessionManager().sendMessage(userId, payload); + } + + public static void sendMessage(PushPayload payload) { + if (!isEnabled()) { + return; + } + getSessionManager().sendMessage(payload); + } + + public static void publishMessage(List userIds, PushPayload payload) { + PushDTO dto = new PushDTO(); + dto.setUserIds(userIds); + dto.setPayload(payload); + publishMessage(dto); + } + + public static void publishMessage(PushDTO dto) { + if (!isEnabled() || dto == null || dto.getPayload() == null) { + return; + } + getSessionManager().publishMessage(dto); + } + + public static void publishAll(String message) { + publishAll(buildMessage(message)); + } + + public static void publishAll(PushPayload payload) { + if (!isEnabled()) { + return; + } + getSessionManager().publishAll(payload); + } + + public static boolean isEnabled() { + return Boolean.TRUE.equals(SpringUtils.getProperty("message.enabled", Boolean.class, Boolean.TRUE)); + } + + private static PushSessionManager getSessionManager() { + return SpringUtils.getBean(PushSessionManager.class); + } + + private static PushPayload buildMessage(String message) { + return PushPayload.of(PushTypeEnum.MESSAGE, PushSourceEnum.BACKEND, message, null); + } +} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java similarity index 53% rename from ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java rename to ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java index c70e37729..a830be5f9 100644 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java @@ -1,4 +1,4 @@ -package org.dromara.common.websocket.interceptor; +package org.dromara.common.push.interceptor; import cn.dev33.satoken.exception.NotLoginException; import cn.dev33.satoken.stp.StpUtil; @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.domain.model.LoginUser; import org.dromara.common.core.utils.ServletUtils; import org.dromara.common.core.utils.StringUtils; +import org.dromara.common.push.constant.MessageConstants; import org.dromara.common.satoken.utils.LoginHelper; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; @@ -13,45 +14,35 @@ import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.Map; - -import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; - /** - * WebSocket握手请求的拦截器 + * WebSocket 握手拦截器。 * - * @author zendwang + * @author Lion Li */ @Slf4j public class PlusWebSocketInterceptor implements HandshakeInterceptor { - /** - * WebSocket握手之前执行的前置处理方法 - * - * @param request WebSocket握手请求 - * @param response WebSocket握手响应 - * @param wsHandler WebSocket处理程序 - * @param attributes 与WebSocket会话关联的属性 - * @return 如果允许握手继续进行,则返回true;否则返回false - */ @Override - public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) { + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, + Map attributes) { try { - // 检查是否登录 是否有token LoginUser loginUser = LoginHelper.getLoginUser(); + String tokenValue = StpUtil.getTokenValue(); + if (loginUser == null || StringUtils.isBlank(tokenValue)) { + return false; + } - // 解决 ws 不走 mvc 拦截器问题(cloud 版本不受影响) - // 检查 header 与 param 里的 clientid 与 token 里的是否一致 String headerCid = ServletUtils.getRequest().getHeader(LoginHelper.CLIENT_KEY); String paramCid = ServletUtils.getParameter(LoginHelper.CLIENT_KEY); String clientId = StpUtil.getExtra(LoginHelper.CLIENT_KEY).toString(); if (!StringUtils.equalsAny(clientId, headerCid, paramCid)) { - // token 无效 throw NotLoginException.newInstance(StpUtil.getLoginType(), "-100", "客户端ID与Token不匹配", StpUtil.getTokenValue()); } - attributes.put(LOGIN_USER_KEY, loginUser); + attributes.put(MessageConstants.LOGIN_USER_KEY, loginUser); + attributes.put(MessageConstants.LOGIN_TOKEN_KEY, tokenValue); return true; } catch (NotLoginException e) { log.error("WebSocket 认证失败'{}',无法访问系统资源", e.getMessage()); @@ -59,17 +50,8 @@ public class PlusWebSocketInterceptor implements HandshakeInterceptor { } } - /** - * WebSocket握手成功后执行的后置处理方法 - * - * @param request WebSocket握手请求 - * @param response WebSocket握手响应 - * @param wsHandler WebSocket处理程序 - * @param exception 握手过程中可能出现的异常 - */ @Override - public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { - // 在这个方法中可以执行一些握手成功后的后续处理逻辑,比如记录日志或者其他操作 + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, + Exception exception) { } - } diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java new file mode 100644 index 000000000..4f1bbad7a --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java @@ -0,0 +1,44 @@ +package org.dromara.common.push.listener; + +import cn.hutool.core.collection.CollUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.push.core.PushSessionManager; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.Ordered; + +/** + * 统一消息主题订阅监听器。 + * + * @author Lion Li + */ +@Slf4j +@RequiredArgsConstructor +public class MessageTopicListener implements ApplicationRunner, Ordered { + + private final PushSessionManager pushSessionManager; + + @Override + public void run(ApplicationArguments args) { + pushSessionManager.subscribeMessage(message -> { + log.info("消息主题订阅收到消息userIds={} message={}", + message.getUserIds(), + message.getPayload() == null ? null : message.getPayload().getMessage()); + if (message.getPayload() == null) { + return; + } + if (CollUtil.isNotEmpty(message.getUserIds())) { + message.getUserIds().forEach(userId -> pushSessionManager.sendMessage(userId, message.getPayload())); + } else { + pushSessionManager.sendMessage(message.getPayload()); + } + }); + log.info("初始化消息主题订阅监听器成功"); + } + + @Override + public int getOrder() { + return -1; + } +} diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/properties/MessageProperties.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/properties/MessageProperties.java new file mode 100644 index 000000000..f444123ef --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/properties/MessageProperties.java @@ -0,0 +1,35 @@ +package org.dromara.common.push.properties; + +import lombok.Data; +import org.dromara.common.push.enums.MessageTransportEnum; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * 统一消息推送配置。 + * + * @author Lion Li + */ +@Data +@ConfigurationProperties("message") +public class MessageProperties { + + /** + * 是否启用消息推送。 + */ + private Boolean enabled = true; + + /** + * 传输方式:sse / websocket。 + */ + private String transport = MessageTransportEnum.SSE.getCode(); + + /** + * 统一访问路径。 + */ + private String path = "/resource/message"; + + /** + * WebSocket 允许的跨域来源。 + */ + private String allowedOrigins = "*"; +} diff --git a/ruoyi-common/ruoyi-common-push/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-push/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 000000000..2e02f912d --- /dev/null +++ b/ruoyi-common/ruoyi-common-push/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1,3 @@ +org.dromara.common.push.config.MessageAutoConfiguration +org.dromara.common.push.config.MessageSseConfiguration +org.dromara.common.push.config.MessageWebSocketConfiguration diff --git a/ruoyi-common/ruoyi-common-security/src/main/java/org/dromara/common/security/config/SecurityConfig.java b/ruoyi-common/ruoyi-common-security/src/main/java/org/dromara/common/security/config/SecurityConfig.java index e94f4dfc1..eee99c74c 100644 --- a/ruoyi-common/ruoyi-common-security/src/main/java/org/dromara/common/security/config/SecurityConfig.java +++ b/ruoyi-common/ruoyi-common-security/src/main/java/org/dromara/common/security/config/SecurityConfig.java @@ -39,8 +39,8 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; public class SecurityConfig implements WebMvcConfigurer { private final SecurityProperties securityProperties; - @Value("${sse.path}") - private String ssePath; + @Value("${message.path:/resource/message}") + private String messagePath; /** * 注册 Sa-Token 路由拦截器并配置鉴权规则。 @@ -85,7 +85,7 @@ public class SecurityConfig implements WebMvcConfigurer { })).addPathPatterns("/**") // 排除不需要拦截的路径 .excludePathPatterns(securityProperties.getExcludes()) - .excludePathPatterns(ssePath); + .excludePathPatterns(messagePath); } /** diff --git a/ruoyi-common/ruoyi-common-sse/pom.xml b/ruoyi-common/ruoyi-common-sse/pom.xml deleted file mode 100644 index ae44c988e..000000000 --- a/ruoyi-common/ruoyi-common-sse/pom.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - - org.dromara - ruoyi-common - ${revision} - - 4.0.0 - - ruoyi-common-sse - - - ruoyi-common-sse 模块 - - - - - org.dromara - ruoyi-common-core - - - org.dromara - ruoyi-common-redis - - - org.dromara - ruoyi-common-satoken - - - org.dromara - ruoyi-common-json - - - diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java deleted file mode 100644 index 0cf8054ed..000000000 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.dromara.common.sse.config; - -import org.dromara.common.sse.controller.SseController; -import org.dromara.common.sse.core.SseEmitterManager; -import org.dromara.common.sse.listener.SseTopicListener; -import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; - -/** - * SSE 自动装配 - * - * @author Lion Li - */ -@AutoConfiguration -@ConditionalOnProperty(value = "sse.enabled", havingValue = "true") -@EnableConfigurationProperties(SseProperties.class) -public class SseAutoConfiguration { - - @Bean - public SseEmitterManager sseEmitterManager() { - return new SseEmitterManager(); - } - - @Bean - public SseTopicListener sseTopicListener() { - return new SseTopicListener(); - } - - @Bean - public SseController sseController(SseEmitterManager sseEmitterManager) { - return new SseController(sseEmitterManager); - } - -} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java deleted file mode 100644 index ce4e1732d..000000000 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.dromara.common.sse.config; - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; - -/** - * SSE 配置项 - * - * @author Lion Li - */ -@Data -@ConfigurationProperties("sse") -public class SseProperties { - - private Boolean enabled; - - /** - * 路径 - */ - private String path; -} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDTO.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDTO.java deleted file mode 100644 index d0fb22fc3..000000000 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDTO.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.dromara.common.sse.dto; - -import lombok.Data; - -import java.io.Serial; -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -/** - * 消息的dto - * - * @author zendwang - */ -@Data -public class SseMessageDTO implements Serializable { - - @Serial - private static final long serialVersionUID = 1L; - - /** - * 需要推送到的session key 列表 - */ - private List userIds; - - /** - * 需要发送的消息 - */ - private String message; - - /** - * 消息类型 - */ - private String messageType; - - /** - * 消息来源 - */ - private String messageSource; - - /** - * 扩展数据 - */ - private Object data; - - /** - * 前端跳转路径 - */ - private String path; - - /** - * 前端跳转参数 - */ - private Map query; -} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java deleted file mode 100644 index 87dc02e70..000000000 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.dromara.common.sse.listener; - -import cn.hutool.core.collection.CollUtil; -import lombok.extern.slf4j.Slf4j; -import org.dromara.common.sse.core.SseEmitterManager; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.core.Ordered; - -/** - * SSE 主题订阅监听器 - * - * @author Lion Li - */ -@Slf4j -public class SseTopicListener implements ApplicationRunner, Ordered { - - @Autowired - private SseEmitterManager sseEmitterManager; - - /** - * 在Spring Boot应用程序启动时初始化SSE主题订阅监听器 - * - * @param args 应用程序参数 - * @throws Exception 初始化过程中可能抛出的异常 - */ - @Override - public void run(ApplicationArguments args) throws Exception { - sseEmitterManager.subscribeMessage((message) -> { - log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage()); - // 如果key不为空就按照key发消息 如果为空就群发 - if (CollUtil.isNotEmpty(message.getUserIds())) { - message.getUserIds().forEach(key -> { - sseEmitterManager.sendMessage(key, message); - }); - } else { - sseEmitterManager.sendMessage( - org.dromara.common.core.domain.dto.PushPayload.of( - message.getMessageType(), - message.getMessageSource(), - message.getMessage(), - message.getData() - ) - ); - } - }); - log.info("初始化SSE主题订阅监听器成功"); - } - - @Override - public int getOrder() { - return -1; - } -} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java deleted file mode 100644 index 3dd613b30..000000000 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java +++ /dev/null @@ -1,153 +0,0 @@ -package org.dromara.common.sse.utils; - -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.dromara.common.core.domain.dto.PushPayload; -import org.dromara.common.core.enums.PushSourceEnum; -import org.dromara.common.core.enums.PushTypeEnum; -import org.dromara.common.core.utils.SpringUtils; -import org.dromara.common.sse.core.SseEmitterManager; -import org.dromara.common.sse.dto.SseMessageDTO; - -import java.util.List; - -/** - * SSE工具类 - * - * @author Lion Li - */ -@Slf4j -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public class SseMessageUtils { - - private final static Boolean SSE_ENABLE = SpringUtils.getProperty("sse.enabled", Boolean.class, true); - private static SseEmitterManager MANAGER; - - static { - if (isEnable() && MANAGER == null) { - MANAGER = SpringUtils.getBean(SseEmitterManager.class); - } - } - - /** - * 向指定用户的 SSE 会话发送消息。 - * - * @param userId 要发送消息的用户id - * @param message 要发送的消息内容 - */ - public static void sendMessage(Long userId, String message) { - if (!isEnable()) { - return; - } - MANAGER.sendMessage(userId, buildMessage(message)); - } - - /** - * 向当前节点上的所有 SSE 会话发送消息。 - * - * @param message 要发送的消息内容 - */ - public static void sendMessage(String message) { - if (!isEnable()) { - return; - } - MANAGER.sendMessage(buildMessage(message)); - } - - /** - * 向指定用户的 SSE 会话发送统一 JSON 消息。 - * - * @param userId 要发送消息的用户id - * @param payload 要发送的消息体 - */ - public static void sendMessage(Long userId, PushPayload payload) { - if (!isEnable()) { - return; - } - MANAGER.sendMessage(userId, payload); - } - - /** - * 向当前节点上的所有 SSE 会话发送统一 JSON 消息。 - * - * @param payload 要发送的消息体 - */ - public static void sendMessage(PushPayload payload) { - if (!isEnable()) { - return; - } - MANAGER.sendMessage(payload); - } - - /** - * 发布 SSE 订阅消息。 - * - * @param sseMessageDTO 要发布的SSE消息对象 - */ - public static void publishMessage(SseMessageDTO sseMessageDTO) { - if (!isEnable()) { - return; - } - MANAGER.publishMessage(sseMessageDTO); - } - - /** - * 向所有用户发布 SSE 广播消息。 - * - * @param message 要发布的消息内容 - */ - public static void publishAll(String message) { - if (!isEnable()) { - return; - } - MANAGER.publishAll(buildMessage(message)); - } - - /** - * 向指定用户发布统一 JSON 消息。 - * - * @param userIds 目标用户 - * @param payload 消息体 - */ - public static void publishMessage(List userIds, PushPayload payload) { - if (!isEnable()) { - return; - } - SseMessageDTO dto = new SseMessageDTO(); - dto.setUserIds(userIds); - dto.setMessage(payload.getMessage()); - dto.setMessageType(payload.getType()); - dto.setMessageSource(payload.getSource()); - dto.setData(payload.getData()); - dto.setPath(payload.getPath()); - dto.setQuery(payload.getQuery()); - MANAGER.publishMessage(dto); - } - - /** - * 向所有用户发布统一 JSON 消息。 - * - * @param payload 消息体 - */ - public static void publishAll(PushPayload payload) { - if (!isEnable()) { - return; - } - MANAGER.publishAll(payload); - } - - /** - * 判断 SSE 功能是否启用。 - * - * @return 是否启用 - */ - public static Boolean isEnable() { - return SSE_ENABLE; - } - - private static PushPayload buildMessage(String message) { - return PushPayload.of(PushTypeEnum.MESSAGE, PushSourceEnum.BACKEND, message, null); - } - -} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports deleted file mode 100644 index b80971390..000000000 --- a/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ /dev/null @@ -1 +0,0 @@ -org.dromara.common.sse.config.SseAutoConfiguration diff --git a/ruoyi-common/ruoyi-common-web/src/main/java/org/dromara/common/web/handler/GlobalExceptionHandler.java b/ruoyi-common/ruoyi-common-web/src/main/java/org/dromara/common/web/handler/GlobalExceptionHandler.java index c59f0f2ac..d02089956 100644 --- a/ruoyi-common/ruoyi-common-web/src/main/java/org/dromara/common/web/handler/GlobalExceptionHandler.java +++ b/ruoyi-common/ruoyi-common-web/src/main/java/org/dromara/common/web/handler/GlobalExceptionHandler.java @@ -11,6 +11,7 @@ import org.dromara.common.core.domain.R; import org.dromara.common.core.exception.ServiceException; import org.dromara.common.core.exception.SseException; import org.dromara.common.core.exception.base.BaseException; +import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.core.utils.StreamUtils; import org.dromara.common.json.utils.JsonUtils; import org.springframework.boot.json.JsonParseException; @@ -129,7 +130,8 @@ public class GlobalExceptionHandler { @ExceptionHandler(IOException.class) public void handleIoException(IOException e, HttpServletRequest request) { String requestURI = request.getRequestURI(); - if (requestURI.contains("sse")) { + String path = SpringUtils.getProperty("message.path"); + if (requestURI.contains(path)) { // sse 经常性连接中断 例如关闭浏览器 直接屏蔽 return; } diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java deleted file mode 100644 index ef5cfc96f..000000000 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.dromara.common.websocket.config; - -import cn.hutool.core.util.StrUtil; -import org.dromara.common.websocket.config.properties.WebSocketProperties; -import org.dromara.common.websocket.handler.PlusWebSocketHandler; -import org.dromara.common.websocket.interceptor.PlusWebSocketInterceptor; -import org.dromara.common.websocket.listener.WebSocketTopicListener; -import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.config.annotation.EnableWebSocket; -import org.springframework.web.socket.config.annotation.WebSocketConfigurer; -import org.springframework.web.socket.server.HandshakeInterceptor; - -/** - * WebSocket 配置 - * - * @author zendwang - */ -@AutoConfiguration -@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true") -@EnableConfigurationProperties(WebSocketProperties.class) -@EnableWebSocket -public class WebSocketConfig { - - @Bean - public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, - WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) { - // 如果WebSocket的路径为空,则设置默认路径为 "/websocket" - if (StrUtil.isBlank(webSocketProperties.getPath())) { - webSocketProperties.setPath("/websocket"); - } - - // 如果允许跨域访问的地址为空,则设置为 "*",表示允许所有来源的跨域请求 - if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) { - webSocketProperties.setAllowedOrigins("*"); - } - - // 返回一个WebSocketConfigurer对象,用于配置WebSocket - return registry -> registry - // 添加WebSocket处理程序和拦截器到指定路径,设置允许的跨域来源 - .addHandler(webSocketHandler, webSocketProperties.getPath()) - .addInterceptors(handshakeInterceptor) - .setAllowedOrigins(webSocketProperties.getAllowedOrigins()); - } - - @Bean - public HandshakeInterceptor handshakeInterceptor() { - return new PlusWebSocketInterceptor(); - } - - @Bean - public WebSocketHandler webSocketHandler() { - return new PlusWebSocketHandler(); - } - - @Bean - public WebSocketTopicListener topicListener() { - return new WebSocketTopicListener(); - } -} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java deleted file mode 100644 index d629fe55a..000000000 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.dromara.common.websocket.config.properties; - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; - -/** - * WebSocket 配置项 - * - * @author zendwang - */ -@ConfigurationProperties("websocket") -@Data -public class WebSocketProperties { - - private Boolean enabled; - - /** - * 路径 - */ - private String path; - - /** - * 设置访问源地址 - */ - private String allowedOrigins; -} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java deleted file mode 100644 index e243279d9..000000000 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.dromara.common.websocket.constant; - -/** - * websocket的常量配置 - * - * @author zendwang - */ -public interface WebSocketConstants { - - /** - * websocketSession中的参数的key - */ - String LOGIN_USER_KEY = "loginUser"; - - /** - * 订阅的频道 - */ - String WEB_SOCKET_TOPIC = "global:websocket"; - - /** - * 前端心跳检查的命令 - */ - String PING = "ping"; - - /** - * 服务端心跳恢复的字符串 - */ - String PONG = "pong"; -} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/dto/WebSocketMessageDTO.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/dto/WebSocketMessageDTO.java deleted file mode 100644 index bbf9c66e6..000000000 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/dto/WebSocketMessageDTO.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.dromara.common.websocket.dto; - -import lombok.Data; - -import java.io.Serial; -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -/** - * 消息的dto - * - * @author zendwang - */ -@Data -public class WebSocketMessageDTO implements Serializable { - - @Serial - private static final long serialVersionUID = 1L; - - /** - * 需要推送到的session key 列表 - */ - private List sessionKeys; - - /** - * 需要发送的消息 - */ - private String message; - - /** - * 消息类型 - */ - private String messageType; - - /** - * 消息来源 - */ - private String messageSource; - - /** - * 扩展数据 - */ - private Object data; - - /** - * 前端跳转路径 - */ - private String path; - - /** - * 前端跳转参数 - */ - private Map query; -} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java deleted file mode 100644 index f841dd39c..000000000 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java +++ /dev/null @@ -1,127 +0,0 @@ -package org.dromara.common.websocket.handler; - -import cn.hutool.core.util.ObjectUtil; -import lombok.extern.slf4j.Slf4j; -import org.dromara.common.core.enums.PushSourceEnum; -import org.dromara.common.core.enums.PushTypeEnum; -import org.dromara.common.core.domain.model.LoginUser; -import org.dromara.common.websocket.dto.WebSocketMessageDTO; -import org.dromara.common.websocket.holder.WebSocketSessionHolder; -import org.dromara.common.websocket.utils.WebSocketUtils; -import org.springframework.web.socket.*; -import org.springframework.web.socket.handler.AbstractWebSocketHandler; -import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; - -import java.io.IOException; -import java.util.List; - -import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; - -/** - * WebSocketHandler 实现类 - * - * @author zendwang - */ -@Slf4j -public class PlusWebSocketHandler extends AbstractWebSocketHandler { - - /** - * 连接成功后 - */ - @Override - public void afterConnectionEstablished(WebSocketSession session) throws IOException { - LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); - if (ObjectUtil.isNull(loginUser)) { - session.close(CloseStatus.BAD_DATA); - log.info("[connect] invalid token received. sessionId: {}", session.getId()); - return; - } - WebSocketSessionHolder.addSession(loginUser.getUserId(), new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64000)); - log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); - } - - /** - * 处理接收到的文本消息 - * - * @param session WebSocket会话 - * @param message 接收到的文本消息 - * @throws Exception 处理消息过程中可能抛出的异常 - */ - @Override - protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - // 从WebSocket会话中获取登录用户信息 - LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); - - // 创建WebSocket消息DTO对象 - WebSocketMessageDTO messageDTO = new WebSocketMessageDTO(); - messageDTO.setSessionKeys(List.of(loginUser.getUserId())); - messageDTO.setMessage(message.getPayload()); - messageDTO.setMessageType(PushTypeEnum.CUSTOM.getType()); - messageDTO.setMessageSource(PushSourceEnum.CLIENT.getSource()); - WebSocketUtils.publishMessage(messageDTO); - } - - /** - * 处理接收到的二进制消息 - * - * @param session WebSocket会话 - * @param message 接收到的二进制消息 - * @throws Exception 处理消息过程中可能抛出的异常 - */ - @Override - protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { - super.handleBinaryMessage(session, message); - } - - /** - * 处理接收到的Pong消息(心跳监测) - * - * @param session WebSocket会话 - * @param message 接收到的Pong消息 - * @throws Exception 处理消息过程中可能抛出的异常 - */ - @Override - protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { - WebSocketUtils.sendPongMessage(session); - } - - /** - * 处理WebSocket传输错误 - * - * @param session WebSocket会话 - * @param exception 发生的异常 - * @throws Exception 处理过程中可能抛出的异常 - */ - @Override - public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage()); - } - - /** - * 在WebSocket连接关闭后执行清理操作 - * - * @param session WebSocket会话 - * @param status 关闭状态信息 - */ - @Override - public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { - LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); - if (ObjectUtil.isNull(loginUser)) { - log.info("[disconnect] invalid token received. sessionId: {}", session.getId()); - return; - } - WebSocketSessionHolder.removeSession(loginUser.getUserId()); - log.info("[disconnect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); - } - - /** - * 指示处理程序是否支持接收部分消息 - * - * @return 如果支持接收部分消息,则返回true;否则返回false - */ - @Override - public boolean supportsPartialMessages() { - return false; - } - -} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java deleted file mode 100644 index 9c2372b85..000000000 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java +++ /dev/null @@ -1,74 +0,0 @@ -package org.dromara.common.websocket.holder; - -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import org.springframework.web.socket.CloseStatus; -import org.springframework.web.socket.WebSocketSession; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * WebSocketSession 用于保存当前所有在线的会话信息 - * - * @author zendwang - */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public class WebSocketSessionHolder { - - private static final Map USER_SESSION_MAP = new ConcurrentHashMap<>(); - - /** - * 将WebSocket会话添加到用户会话Map中 - * - * @param sessionKey 会话键,用于检索会话 - * @param session 要添加的WebSocket会话 - */ - public static void addSession(Long sessionKey, WebSocketSession session) { - removeSession(sessionKey); - USER_SESSION_MAP.put(sessionKey, session); - } - - /** - * 从用户会话Map中移除指定会话键对应的WebSocket会话 - * - * @param sessionKey 要移除的会话键 - */ - public static void removeSession(Long sessionKey) { - WebSocketSession session = USER_SESSION_MAP.remove(sessionKey); - try { - session.close(CloseStatus.BAD_DATA); - } catch (Exception ignored) { - } - } - - /** - * 根据会话键从用户会话Map中获取WebSocket会话 - * - * @param sessionKey 要获取的会话键 - * @return 与给定会话键对应的WebSocket会话,如果不存在则返回null - */ - public static WebSocketSession getSessions(Long sessionKey) { - return USER_SESSION_MAP.get(sessionKey); - } - - /** - * 获取存储在用户会话Map中所有WebSocket会话的会话键集合 - * - * @return 所有WebSocket会话的会话键集合 - */ - public static Set getSessionsAll() { - return USER_SESSION_MAP.keySet(); - } - - /** - * 检查给定的会话键是否存在于用户会话Map中 - * - * @param sessionKey 要检查的会话键 - * @return 如果存在对应的会话键,则返回true;否则返回false - */ - public static Boolean existSession(Long sessionKey) { - return USER_SESSION_MAP.containsKey(sessionKey); - } -} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java deleted file mode 100644 index 002d99a16..000000000 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.dromara.common.websocket.listener; - -import cn.hutool.core.collection.CollUtil; -import lombok.extern.slf4j.Slf4j; -import org.dromara.common.websocket.holder.WebSocketSessionHolder; -import org.dromara.common.websocket.utils.WebSocketUtils; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.core.Ordered; - -/** - * WebSocket 主题订阅监听器 - * - * @author zendwang - */ -@Slf4j -public class WebSocketTopicListener implements ApplicationRunner, Ordered { - - /** - * 在Spring Boot应用程序启动时初始化WebSocket主题订阅监听器 - * - * @param args 应用程序参数 - * @throws Exception 初始化过程中可能抛出的异常 - */ - @Override - public void run(ApplicationArguments args) throws Exception { - // 订阅WebSocket消息 - WebSocketUtils.subscribeMessage((message) -> { - log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage()); - // 如果key不为空就按照key发消息 如果为空就群发 - if (CollUtil.isNotEmpty(message.getSessionKeys())) { - message.getSessionKeys().forEach(key -> { - if (WebSocketSessionHolder.existSession(key)) { - WebSocketUtils.sendMessage(key, message); - } - }); - } else { - WebSocketSessionHolder.getSessionsAll().forEach(key -> { - WebSocketUtils.sendMessage(key, message); - }); - } - }); - log.info("初始化WebSocket主题订阅监听器成功"); - } - - @Override - public int getOrder() { - return -1; - } -} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java deleted file mode 100644 index 9d476fdd4..000000000 --- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java +++ /dev/null @@ -1,189 +0,0 @@ -package org.dromara.common.websocket.utils; - -import cn.hutool.core.collection.CollUtil; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.dromara.common.core.domain.dto.PushPayload; -import org.dromara.common.json.utils.JsonUtils; -import org.dromara.common.redis.utils.RedisUtils; -import org.dromara.common.websocket.dto.WebSocketMessageDTO; -import org.dromara.common.websocket.holder.WebSocketSessionHolder; -import org.springframework.web.socket.PongMessage; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketMessage; -import org.springframework.web.socket.WebSocketSession; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; - -import static org.dromara.common.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC; - -/** - * 工具类 - * - * @author zendwang - */ -@Slf4j -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public class WebSocketUtils { - - /** - * 向指定会话标识发送文本消息。 - * - * @param sessionKey 要发送消息的用户id - * @param message 要发送的消息内容 - */ - public static void sendMessage(Long sessionKey, String message) { - WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey); - sendMessage(session, message); - } - - /** - * 向指定会话标识发送统一 JSON 消息。 - * - * @param sessionKey 要发送消息的用户id - * @param webSocketMessage 要发送的消息内容 - */ - public static void sendMessage(Long sessionKey, WebSocketMessageDTO webSocketMessage) { - WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey); - sendMessage(session, buildPayload(webSocketMessage)); - } - - /** - * 订阅 WebSocket 广播主题消息。 - * - * @param consumer 处理WebSocket消息的消费者函数 - */ - public static void subscribeMessage(Consumer consumer) { - RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDTO.class, consumer); - } - - /** - * 按会话标识发布 WebSocket 消息。 - * - * @param webSocketMessage 要发布的WebSocket消息对象 - */ - public static void publishMessage(WebSocketMessageDTO webSocketMessage) { - List unsentSessionKeys = new ArrayList<>(); - // 当前服务内session,直接发送消息 - for (Long sessionKey : webSocketMessage.getSessionKeys()) { - if (WebSocketSessionHolder.existSession(sessionKey)) { - WebSocketUtils.sendMessage(sessionKey, webSocketMessage); - continue; - } - unsentSessionKeys.add(sessionKey); - } - // 不在当前服务内session,发布订阅消息 - if (CollUtil.isNotEmpty(unsentSessionKeys)) { - WebSocketMessageDTO broadcastMessage = new WebSocketMessageDTO(); - broadcastMessage.setSessionKeys(unsentSessionKeys); - broadcastMessage.setMessage(webSocketMessage.getMessage()); - RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { - log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}", - WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage()); - }); - } - } - - /** - * 向所有 WebSocket 会话发布广播消息。 - * - * @param message 要发布的消息内容 - */ - public static void publishAll(String message) { - WebSocketMessageDTO broadcastMessage = new WebSocketMessageDTO(); - broadcastMessage.setMessage(message); - RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { - log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message); - }); - } - - /** - * 向指定会话标识发布统一 JSON 消息。 - * - * @param sessionKeys 目标会话 - * @param payload 消息体 - */ - public static void publishMessage(List sessionKeys, PushPayload payload) { - WebSocketMessageDTO dto = new WebSocketMessageDTO(); - dto.setSessionKeys(sessionKeys); - dto.setMessage(payload.getMessage()); - dto.setMessageType(payload.getType()); - dto.setMessageSource(payload.getSource()); - dto.setData(payload.getData()); - dto.setPath(payload.getPath()); - dto.setQuery(payload.getQuery()); - publishMessage(dto); - } - - /** - * 向所有 WebSocket 会话发布统一 JSON 消息。 - * - * @param payload 消息体 - */ - public static void publishAll(PushPayload payload) { - WebSocketMessageDTO broadcastMessage = new WebSocketMessageDTO(); - broadcastMessage.setMessage(payload.getMessage()); - broadcastMessage.setMessageType(payload.getType()); - broadcastMessage.setMessageSource(payload.getSource()); - broadcastMessage.setData(payload.getData()); - broadcastMessage.setPath(payload.getPath()); - broadcastMessage.setQuery(payload.getQuery()); - RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { - log.info("WebSocket发送主题订阅消息topic:{} type:{} source:{} message:{}", - WEB_SOCKET_TOPIC, payload.getType(), payload.getSource(), payload.getMessage()); - }); - } - - /** - * 向指定会话发送 Pong 心跳消息。 - * - * @param session 要发送Pong消息的WebSocket会话 - */ - public static void sendPongMessage(WebSocketSession session) { - sendMessage(session, new PongMessage()); - } - - /** - * 向指定 WebSocket 会话发送文本消息。 - * - * @param session WebSocket会话 - * @param message 要发送的文本消息内容 - */ - public static void sendMessage(WebSocketSession session, String message) { - sendMessage(session, new TextMessage(message)); - } - - private static String buildPayload(WebSocketMessageDTO webSocketMessage) { - PushPayload payload = PushPayload.of( - webSocketMessage.getMessageType(), - webSocketMessage.getMessageSource(), - webSocketMessage.getMessage(), - webSocketMessage.getData() - ); - payload.setPath(webSocketMessage.getPath()); - payload.setQuery(webSocketMessage.getQuery()); - return JsonUtils.toJsonString(payload); - } - - /** - * 向指定 WebSocket 会话发送原始消息对象。 - * - * @param session WebSocket会话 - * @param message 要发送的WebSocket消息对象 - */ - private static void sendMessage(WebSocketSession session, WebSocketMessage message) { - if (session == null || !session.isOpen()) { - log.warn("[send] session会话已经关闭"); - } else { - try { - session.sendMessage(message); - } catch (IOException e) { - log.error("[send] session({}) 发送消息({}) 异常", session, message, e); - } - } - } -} diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports deleted file mode 100644 index c3a7305a3..000000000 --- a/ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ /dev/null @@ -1 +0,0 @@ -org.dromara.common.websocket.config.WebSocketConfig diff --git a/ruoyi-modules/ruoyi-demo/pom.xml b/ruoyi-modules/ruoyi-demo/pom.xml index c61fb47a0..ec948ceee 100644 --- a/ruoyi-modules/ruoyi-demo/pom.xml +++ b/ruoyi-modules/ruoyi-demo/pom.xml @@ -85,7 +85,7 @@ org.dromara - ruoyi-common-websocket + ruoyi-common-push diff --git a/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/controller/WebSocketController.java b/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/controller/WebSocketController.java index 6497ee5d7..92c738a71 100644 --- a/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/controller/WebSocketController.java +++ b/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/controller/WebSocketController.java @@ -1,14 +1,18 @@ package org.dromara.demo.controller; import org.dromara.common.core.domain.R; -import org.dromara.common.websocket.dto.WebSocketMessageDTO; -import org.dromara.common.websocket.utils.WebSocketUtils; +import org.dromara.common.core.domain.dto.PushPayload; +import org.dromara.common.core.enums.PushSourceEnum; +import org.dromara.common.core.enums.PushTypeEnum; +import org.dromara.common.push.helper.PushHelper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.util.List; + /** * WebSocket 演示案例 * @@ -23,11 +27,22 @@ public class WebSocketController { /** * 发布消息 * - * @param dto 发送内容 + * @param userId 目标用户 + * @param message 发送内容 */ @GetMapping("/send") - public R send(WebSocketMessageDTO dto) throws InterruptedException { - WebSocketUtils.publishMessage(dto); + public R send(Long userId, String message) { + PushPayload payload = PushPayload.of( + PushTypeEnum.MESSAGE, + PushSourceEnum.BACKEND, + message, + null + ); + if (userId == null) { + PushHelper.publishAll(payload); + } else { + PushHelper.publishMessage(List.of(userId), payload); + } return R.ok("操作成功"); } } diff --git a/ruoyi-modules/ruoyi-system/pom.xml b/ruoyi-modules/ruoyi-system/pom.xml index a24883dff..eaa6ab0c4 100644 --- a/ruoyi-modules/ruoyi-system/pom.xml +++ b/ruoyi-modules/ruoyi-system/pom.xml @@ -82,12 +82,7 @@ org.dromara - ruoyi-common-websocket - - - - org.dromara - ruoyi-common-sse + ruoyi-common-push diff --git a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/controller/system/SysNoticeController.java b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/controller/system/SysNoticeController.java index e7d8d7ecd..53ec668c4 100644 --- a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/controller/system/SysNoticeController.java +++ b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/controller/system/SysNoticeController.java @@ -4,15 +4,15 @@ import cn.dev33.satoken.annotation.SaCheckPermission; import lombok.RequiredArgsConstructor; import org.dromara.common.core.domain.PageResult; import org.dromara.common.core.domain.R; +import org.dromara.common.core.domain.dto.PushPayload; import org.dromara.common.core.enums.PushSourceEnum; import org.dromara.common.core.enums.PushTypeEnum; import org.dromara.common.core.service.DictService; import org.dromara.common.log.annotation.Log; import org.dromara.common.log.enums.BusinessType; import org.dromara.common.mybatis.core.page.PageQuery; +import org.dromara.common.push.helper.PushHelper; import org.dromara.common.redis.annotation.RepeatSubmit; -import org.dromara.common.sse.dto.SseMessageDTO; -import org.dromara.common.sse.utils.SseMessageUtils; import org.dromara.common.web.core.BaseController; import org.dromara.system.domain.bo.SysNoticeBo; import org.dromara.system.domain.vo.SysNoticeVo; @@ -82,13 +82,14 @@ public class SysNoticeController extends BaseController { data.put("noticeType", notice.getNoticeType()); data.put("noticeTypeLabel", type); data.put("noticeTitle", notice.getNoticeTitle()); - SseMessageDTO dto = new SseMessageDTO(); - dto.setMessage("[" + type + "] " + notice.getNoticeTitle()); - dto.setMessageType(PushTypeEnum.NOTICE.getType()); - dto.setMessageSource(PushSourceEnum.NOTICE.getSource()); - dto.setData(data); - dto.setPath("/system/notice"); - SseMessageUtils.publishMessage(dto); + PushHelper.publishAll(PushPayload.of( + PushTypeEnum.NOTICE, + PushSourceEnum.NOTICE, + "[" + type + "] " + notice.getNoticeTitle(), + data, + "/system/notice", + null + )); return R.ok(); } diff --git a/ruoyi-modules/ruoyi-workflow/pom.xml b/ruoyi-modules/ruoyi-workflow/pom.xml index 9034f9688..40d6b072b 100644 --- a/ruoyi-modules/ruoyi-workflow/pom.xml +++ b/ruoyi-modules/ruoyi-workflow/pom.xml @@ -20,7 +20,7 @@ org.dromara - ruoyi-common-sse + ruoyi-common-push diff --git a/ruoyi-modules/ruoyi-workflow/src/main/java/org/dromara/workflow/service/impl/FlwCommonServiceImpl.java b/ruoyi-modules/ruoyi-workflow/src/main/java/org/dromara/workflow/service/impl/FlwCommonServiceImpl.java index f0797d228..d10a05717 100644 --- a/ruoyi-modules/ruoyi-workflow/src/main/java/org/dromara/workflow/service/impl/FlwCommonServiceImpl.java +++ b/ruoyi-modules/ruoyi-workflow/src/main/java/org/dromara/workflow/service/impl/FlwCommonServiceImpl.java @@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.domain.dto.PushPayload; import org.dromara.common.core.domain.dto.UserDTO; import org.dromara.common.core.enums.PushSourceEnum; import org.dromara.common.core.enums.PushTypeEnum; @@ -12,8 +13,7 @@ import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.core.utils.StreamUtils; import org.dromara.common.core.utils.StringUtils; import org.dromara.common.mail.utils.MailUtils; -import org.dromara.common.sse.dto.SseMessageDTO; -import org.dromara.common.sse.utils.SseMessageUtils; +import org.dromara.common.push.helper.PushHelper; import org.dromara.warm.flow.core.FlowEngine; import org.dromara.warm.flow.core.entity.Node; import org.dromara.warm.flow.orm.entity.FlowTask; @@ -94,12 +94,12 @@ public class FlwCommonServiceImpl implements IFlwCommonService { try { switch (messageTypeEnum) { case SYSTEM_MESSAGE -> { - SseMessageDTO dto = new SseMessageDTO(); - dto.setUserIds(userIds); - dto.setMessage(message); - dto.setMessageType(PushTypeEnum.MESSAGE.getType()); - dto.setMessageSource(PushSourceEnum.WORKFLOW.getSource()); - SseMessageUtils.publishMessage(dto); + PushHelper.publishMessage(userIds, PushPayload.of( + PushTypeEnum.MESSAGE, + PushSourceEnum.WORKFLOW, + message, + null + )); } case EMAIL_MESSAGE -> MailUtils.sendText(emails, subject, message); case SMS_MESSAGE -> {