From 029f6a4c116c2a41c8ef11ff189448436490e17b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=96=AF=E7=8B=82=E7=9A=84=E7=8B=AE=E5=AD=90Li?=
<15040126243@163.com>
Date: Thu, 26 Mar 2026 17:25:36 +0800
Subject: [PATCH] =?UTF-8?q?update=20=E9=87=8D=E6=9E=84=20common-sse=20?=
=?UTF-8?q?=E4=B8=8E=20common-websocket=20=E5=90=88=E5=B9=B6=E4=B8=BA=20ru?=
=?UTF-8?q?oyi-common-push=20=E6=8E=A8=E9=80=81=E6=A8=A1=E5=9D=97?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../web/controller/AuthController.java | 19 +-
.../src/main/resources/application.yml | 18 +-
ruoyi-common/pom.xml | 3 +-
ruoyi-common/ruoyi-common-bom/pom.xml | 11 +-
.../pom.xml | 4 +-
.../push/config/MessageAutoConfiguration.java | 17 ++
.../push/config/MessageSseConfiguration.java | 33 +++
.../config/MessageWebSocketConfiguration.java | 55 +++++
.../push/constant/MessageConstants.java | 19 ++
.../push}/controller/SseController.java | 28 ++-
.../common/push/core/PushSessionManager.java | 24 +++
.../push/core/SseEmitterSessionManager.java} | 82 +++-----
.../push/core/WebSocketSessionManager.java | 168 ++++++++++++++++
.../org/dromara/common/push/dto/PushDTO.java | 30 +++
.../push/enums/MessageTransportEnum.java | 32 +++
.../push/handler/PlusWebSocketHandler.java | 104 ++++++++++
.../common/push/helper/PushHelper.java | 80 ++++++++
.../interceptor/PlusWebSocketInterceptor.java | 46 ++---
.../push/listener/MessageTopicListener.java | 44 ++++
.../push/properties/MessageProperties.java | 35 ++++
...ot.autoconfigure.AutoConfiguration.imports | 3 +
.../security/config/SecurityConfig.java | 6 +-
ruoyi-common/ruoyi-common-sse/pom.xml | 36 ----
.../sse/config/SseAutoConfiguration.java | 36 ----
.../common/sse/config/SseProperties.java | 21 --
.../dromara/common/sse/dto/SseMessageDTO.java | 55 -----
.../common/sse/listener/SseTopicListener.java | 55 -----
.../common/sse/utils/SseMessageUtils.java | 153 --------------
...ot.autoconfigure.AutoConfiguration.imports | 1 -
.../web/handler/GlobalExceptionHandler.java | 4 +-
.../websocket/config/WebSocketConfig.java | 63 ------
.../properties/WebSocketProperties.java | 26 ---
.../constant/WebSocketConstants.java | 29 ---
.../websocket/dto/WebSocketMessageDTO.java | 55 -----
.../handler/PlusWebSocketHandler.java | 127 ------------
.../holder/WebSocketSessionHolder.java | 74 -------
.../listener/WebSocketTopicListener.java | 50 -----
.../websocket/utils/WebSocketUtils.java | 189 ------------------
...ot.autoconfigure.AutoConfiguration.imports | 1 -
ruoyi-modules/ruoyi-demo/pom.xml | 2 +-
.../demo/controller/WebSocketController.java | 25 ++-
ruoyi-modules/ruoyi-system/pom.xml | 7 +-
.../system/SysNoticeController.java | 19 +-
ruoyi-modules/ruoyi-workflow/pom.xml | 2 +-
.../service/impl/FlwCommonServiceImpl.java | 16 +-
45 files changed, 775 insertions(+), 1132 deletions(-)
rename ruoyi-common/{ruoyi-common-websocket => ruoyi-common-push}/pom.xml (94%)
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageAutoConfiguration.java
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/constant/MessageConstants.java
rename ruoyi-common/{ruoyi-common-sse/src/main/java/org/dromara/common/sse => ruoyi-common-push/src/main/java/org/dromara/common/push}/controller/SseController.java (71%)
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/PushSessionManager.java
rename ruoyi-common/{ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java => ruoyi-common-push/src/main/java/org/dromara/common/push/core/SseEmitterSessionManager.java} (75%)
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/dto/PushDTO.java
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/enums/MessageTransportEnum.java
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java
rename ruoyi-common/{ruoyi-common-websocket/src/main/java/org/dromara/common/websocket => ruoyi-common-push/src/main/java/org/dromara/common/push}/interceptor/PlusWebSocketInterceptor.java (53%)
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/properties/MessageProperties.java
create mode 100644 ruoyi-common/ruoyi-common-push/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
delete mode 100644 ruoyi-common/ruoyi-common-sse/pom.xml
delete mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java
delete mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
delete mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDTO.java
delete mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java
delete mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java
delete mode 100644 ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
delete mode 100644 ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java
delete mode 100644 ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java
delete mode 100644 ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java
delete mode 100644 ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/dto/WebSocketMessageDTO.java
delete mode 100644 ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java
delete mode 100644 ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java
delete mode 100644 ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java
delete mode 100644 ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java
delete mode 100644 ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
diff --git a/ruoyi-admin/src/main/java/org/dromara/web/controller/AuthController.java b/ruoyi-admin/src/main/java/org/dromara/web/controller/AuthController.java
index fc524645a..f5fdfac2b 100644
--- a/ruoyi-admin/src/main/java/org/dromara/web/controller/AuthController.java
+++ b/ruoyi-admin/src/main/java/org/dromara/web/controller/AuthController.java
@@ -12,9 +12,12 @@ import me.zhyd.oauth.request.AuthRequest;
import me.zhyd.oauth.utils.AuthStateUtils;
import org.dromara.common.core.constant.SystemConstants;
import org.dromara.common.core.domain.R;
+import org.dromara.common.core.domain.dto.PushPayload;
import org.dromara.common.core.domain.model.LoginBody;
import org.dromara.common.core.domain.model.RegisterBody;
import org.dromara.common.core.domain.model.SocialLoginBody;
+import org.dromara.common.core.enums.PushSourceEnum;
+import org.dromara.common.core.enums.PushTypeEnum;
import org.dromara.common.core.utils.DateUtils;
import org.dromara.common.core.utils.MessageUtils;
import org.dromara.common.core.utils.StringUtils;
@@ -23,12 +26,11 @@ import org.dromara.common.encrypt.annotation.ApiEncrypt;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.redis.annotation.RateLimiter;
import org.dromara.common.redis.enums.LimitType;
+import org.dromara.common.push.helper.PushHelper;
import org.dromara.common.satoken.utils.LoginHelper;
import org.dromara.common.social.config.properties.SocialLoginConfigProperties;
import org.dromara.common.social.config.properties.SocialProperties;
import org.dromara.common.social.utils.SocialUtils;
-import org.dromara.common.sse.dto.SseMessageDTO;
-import org.dromara.common.sse.utils.SseMessageUtils;
import org.dromara.system.domain.vo.SysClientVo;
import org.dromara.system.service.ISysClientService;
import org.dromara.system.service.ISysConfigService;
@@ -93,10 +95,15 @@ public class AuthController {
Long userId = LoginHelper.getUserId();
scheduledExecutorService.schedule(() -> {
- SseMessageDTO dto = new SseMessageDTO();
- dto.setUserIds(List.of(userId));
- dto.setMessage(DateUtils.getTodayHour(new Date()) + "好,欢迎登录 RuoYi-Vue-Plus 后台管理系统");
- SseMessageUtils.publishMessage(dto);
+ PushHelper.publishMessage(
+ List.of(userId),
+ PushPayload.of(
+ PushTypeEnum.MESSAGE,
+ PushSourceEnum.BACKEND,
+ DateUtils.getTodayHour(new Date()) + "好,欢迎登录 RuoYi-Vue-Plus 后台管理系统",
+ null
+ )
+ );
}, 5, TimeUnit.SECONDS);
return R.ok(loginVo);
}
diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml
index b58c97a42..8f437e925 100644
--- a/ruoyi-admin/src/main/resources/application.yml
+++ b/ruoyi-admin/src/main/resources/application.yml
@@ -213,18 +213,14 @@ management:
logfile:
external-file: ./logs/sys-console.log
---- # 默认/推荐使用sse推送
-sse:
+--- # 统一消息推送配置
+message:
enabled: true
- path: /resource/sse
-
---- # websocket
-websocket:
- # 如果关闭 需要和前端开关一起关闭
- enabled: false
- # 路径
- path: /resource/websocket
- # 设置访问源地址
+ # sse / websocket
+ transport: sse
+ # 统一访问路径
+ path: /resource/message
+ # websocket 允许的跨域来源
allowedOrigins: '*'
--- # warm-flow工作流配置
diff --git a/ruoyi-common/pom.xml b/ruoyi-common/pom.xml
index 8d779e122..f6a30c47f 100644
--- a/ruoyi-common/pom.xml
+++ b/ruoyi-common/pom.xml
@@ -35,8 +35,7 @@
ruoyi-common-sensitive
ruoyi-common-json
ruoyi-common-encrypt
- ruoyi-common-websocket
- ruoyi-common-sse
+ ruoyi-common-push
ruoyi-common-mqtt
diff --git a/ruoyi-common/ruoyi-common-bom/pom.xml b/ruoyi-common/ruoyi-common-bom/pom.xml
index 4f025fdc4..9eb04dd77 100644
--- a/ruoyi-common/ruoyi-common-bom/pom.xml
+++ b/ruoyi-common/ruoyi-common-bom/pom.xml
@@ -144,17 +144,10 @@
${revision}
-
+
org.dromara
- ruoyi-common-websocket
- ${revision}
-
-
-
-
- org.dromara
- ruoyi-common-sse
+ ruoyi-common-push
${revision}
diff --git a/ruoyi-common/ruoyi-common-websocket/pom.xml b/ruoyi-common/ruoyi-common-push/pom.xml
similarity index 94%
rename from ruoyi-common/ruoyi-common-websocket/pom.xml
rename to ruoyi-common/ruoyi-common-push/pom.xml
index 0587cd79a..4d888e05f 100644
--- a/ruoyi-common/ruoyi-common-websocket/pom.xml
+++ b/ruoyi-common/ruoyi-common-push/pom.xml
@@ -9,10 +9,10 @@
4.0.0
- ruoyi-common-websocket
+ ruoyi-common-push
- ruoyi-common-websocket 模块
+ ruoyi-common-push 模块
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageAutoConfiguration.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageAutoConfiguration.java
new file mode 100644
index 000000000..7ea031130
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageAutoConfiguration.java
@@ -0,0 +1,17 @@
+package org.dromara.common.push.config;
+
+import org.dromara.common.push.properties.MessageProperties;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+/**
+ * 统一消息推送公共自动装配。
+ *
+ * @author Lion Li
+ */
+@AutoConfiguration
+@ConditionalOnProperty(prefix = "message", name = "enabled", havingValue = "true", matchIfMissing = true)
+@EnableConfigurationProperties(MessageProperties.class)
+public class MessageAutoConfiguration {
+}
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java
new file mode 100644
index 000000000..02b9d78ff
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java
@@ -0,0 +1,33 @@
+package org.dromara.common.push.config;
+
+import org.dromara.common.push.controller.SseController;
+import org.dromara.common.push.core.SseEmitterSessionManager;
+import org.dromara.common.push.listener.MessageTopicListener;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+
+/**
+ * SSE 消息推送自动装配。
+ *
+ * @author Lion Li
+ */
+@AutoConfiguration(after = MessageAutoConfiguration.class)
+@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "sse", matchIfMissing = true)
+public class MessageSseConfiguration {
+
+ @Bean
+ public SseEmitterSessionManager sseEmitterManager() {
+ return new SseEmitterSessionManager();
+ }
+
+ @Bean
+ public MessageTopicListener messageTopicListener(SseEmitterSessionManager manager) {
+ return new MessageTopicListener(manager);
+ }
+
+ @Bean
+ public SseController sseController(SseEmitterSessionManager manager) {
+ return new SseController(manager);
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java
new file mode 100644
index 000000000..b1bb6ec2e
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java
@@ -0,0 +1,55 @@
+package org.dromara.common.push.config;
+
+import org.dromara.common.push.listener.MessageTopicListener;
+import org.dromara.common.push.core.WebSocketSessionManager;
+import org.dromara.common.push.handler.PlusWebSocketHandler;
+import org.dromara.common.push.interceptor.PlusWebSocketInterceptor;
+import org.dromara.common.push.properties.MessageProperties;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.server.HandshakeInterceptor;
+
+/**
+ * WebSocket 消息推送自动装配。
+ *
+ * @author Lion Li
+ */
+@EnableWebSocket
+@AutoConfiguration(after = MessageAutoConfiguration.class)
+@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "websocket")
+public class MessageWebSocketConfiguration {
+
+ @Bean
+ public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor,
+ WebSocketHandler webSocketHandler,
+ MessageProperties messageProperties) {
+ return registry -> registry
+ .addHandler(webSocketHandler, messageProperties.getPath())
+ .addInterceptors(handshakeInterceptor)
+ .setAllowedOrigins(messageProperties.getAllowedOrigins());
+ }
+
+ @Bean
+ public WebSocketSessionManager webSocketSessionManager() {
+ return new WebSocketSessionManager();
+ }
+
+ @Bean
+ public HandshakeInterceptor handshakeInterceptor() {
+ return new PlusWebSocketInterceptor();
+ }
+
+ @Bean
+ public WebSocketHandler webSocketHandler(WebSocketSessionManager webSocketSessionManager) {
+ return new PlusWebSocketHandler(webSocketSessionManager);
+ }
+
+ @Bean
+ public MessageTopicListener messageTopicListener(WebSocketSessionManager webSocketSessionManager) {
+ return new MessageTopicListener(webSocketSessionManager);
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/constant/MessageConstants.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/constant/MessageConstants.java
new file mode 100644
index 000000000..3f66957af
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/constant/MessageConstants.java
@@ -0,0 +1,19 @@
+package org.dromara.common.push.constant;
+
+/**
+ * 模块通用消息常量定义。
+ *
+ * @author Lion Li
+ */
+public interface MessageConstants {
+
+ String LOGIN_USER_KEY = "loginUser";
+
+ String LOGIN_TOKEN_KEY = "token";
+
+ String MESSAGE_TOPIC = "global:message";
+
+ String PING = "ping";
+
+ String PONG = "pong";
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/controller/SseController.java
similarity index 71%
rename from ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java
rename to ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/controller/SseController.java
index 2967d6736..ce42857a8 100644
--- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/controller/SseController.java
@@ -1,13 +1,12 @@
-package org.dromara.common.sse.controller;
+package org.dromara.common.push.controller;
import cn.dev33.satoken.annotation.SaIgnore;
import cn.dev33.satoken.stp.StpUtil;
import lombok.RequiredArgsConstructor;
import org.dromara.common.core.domain.R;
+import org.dromara.common.push.core.SseEmitterSessionManager;
import org.dromara.common.satoken.utils.LoginHelper;
-import org.dromara.common.sse.core.SseEmitterManager;
import org.springframework.beans.factory.DisposableBean;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -19,25 +18,24 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
* @author Lion Li
*/
@RestController
-@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
@RequiredArgsConstructor
public class SseController implements DisposableBean {
- private final SseEmitterManager sseEmitterManager;
+ private final SseEmitterSessionManager sessionManager;
/**
* 建立当前登录用户的 SSE 连接。
*
* @return SSE 发射器
*/
- @GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ @GetMapping(value = "${message.path:/resource/message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect() {
if (!StpUtil.isLogin()) {
return null;
}
String tokenValue = StpUtil.getTokenValue();
Long userId = LoginHelper.getUserId();
- return sseEmitterManager.connect(userId, tokenValue);
+ return sessionManager.connect(userId, tokenValue);
}
/**
@@ -46,11 +44,11 @@ public class SseController implements DisposableBean {
* @return 操作结果
*/
@SaIgnore
- @GetMapping(value = "${sse.path}/close")
+ @GetMapping(value = "${message.path:/resource/message}/close")
public R close() {
String tokenValue = StpUtil.getTokenValue();
Long userId = LoginHelper.getUserId();
- sseEmitterManager.disconnect(userId, tokenValue);
+ sessionManager.disconnect(userId, tokenValue);
return R.ok();
}
@@ -61,12 +59,12 @@ public class SseController implements DisposableBean {
// * @param userId 目标用户的 ID
// * @param msg 要发送的消息内容
// */
-// @GetMapping(value = "${sse.path}/send")
+// @GetMapping(value = "${message.path:/resource/message}/send")
// public R send(Long userId, String msg) {
-// SseMessageDTO dto = new SseMessageDTO();
+// PushDTO dto = new PushDTO();
// dto.setUserIds(List.of(userId));
-// dto.setMessage(msg);
-// sseEmitterManager.publishMessage(dto);
+// dto.setPayload(PushPayload.of("message", "backend", msg, null));
+// sessionManager.publishMessage(dto);
// return R.ok();
// }
//
@@ -75,9 +73,9 @@ public class SseController implements DisposableBean {
// *
// * @param msg 要发送的消息内容
// */
-// @GetMapping(value = "${sse.path}/sendAll")
+// @GetMapping(value = "${message.path:/resource/message}/sendAll")
// public R send(String msg) {
-// sseEmitterManager.publishAll(msg);
+// sessionManager.publishAll(msg);
// return R.ok();
// }
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/PushSessionManager.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/PushSessionManager.java
new file mode 100644
index 000000000..384a83ec7
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/PushSessionManager.java
@@ -0,0 +1,24 @@
+package org.dromara.common.push.core;
+
+import org.dromara.common.core.domain.dto.PushPayload;
+import org.dromara.common.push.dto.PushDTO;
+
+import java.util.function.Consumer;
+
+/**
+ * 统一推送会话管理器。
+ *
+ * @author Lion Li
+ */
+public interface PushSessionManager {
+
+ void subscribeMessage(Consumer consumer);
+
+ void sendMessage(Long userId, PushPayload payload);
+
+ void sendMessage(PushPayload payload);
+
+ void publishMessage(PushDTO pushDTO);
+
+ void publishAll(PushPayload payload);
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/SseEmitterSessionManager.java
similarity index 75%
rename from ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
rename to ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/SseEmitterSessionManager.java
index 0540096a1..042e0759d 100644
--- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/SseEmitterSessionManager.java
@@ -1,13 +1,14 @@
-package org.dromara.common.sse.core;
+package org.dromara.common.push.core;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.dto.PushPayload;
+import org.dromara.common.push.constant.MessageConstants;
+import org.dromara.common.push.dto.PushDTO;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.redis.utils.RedisUtils;
-import org.dromara.common.sse.dto.SseMessageDTO;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
@@ -25,16 +26,11 @@ import java.util.function.Consumer;
* @author Lion Li
*/
@Slf4j
-public class SseEmitterManager {
-
- /**
- * 订阅的频道
- */
- private final static String SSE_TOPIC = "global:sse";
+public class SseEmitterSessionManager implements PushSessionManager {
private final static Map> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
- public SseEmitterManager() {
+ public SseEmitterSessionManager() {
// 定时执行 SSE 心跳检测
SpringUtils.getBean(ScheduledExecutorService.class)
.scheduleWithFixedDelay(this::sseMonitor, 60L, 60L, TimeUnit.SECONDS);
@@ -160,8 +156,9 @@ public class SseEmitterManager {
*
* @param consumer 处理SSE消息的消费者函数
*/
- public void subscribeMessage(Consumer consumer) {
- RedisUtils.subscribe(SSE_TOPIC, SseMessageDTO.class, consumer);
+ @Override
+ public void subscribeMessage(Consumer consumer) {
+ RedisUtils.subscribe(MessageConstants.MESSAGE_TOPIC, PushDTO.class, consumer);
}
/**
@@ -196,6 +193,7 @@ public class SseEmitterManager {
* @param userId 要发送消息的用户id
* @param payload 要发送的消息体
*/
+ @Override
public void sendMessage(Long userId, PushPayload payload) {
sendMessage(userId, JsonUtils.toJsonString(payload));
}
@@ -204,10 +202,13 @@ public class SseEmitterManager {
* 向指定用户的全部本地 SSE 会话发送统一 JSON 消息。
*
* @param userId 要发送消息的用户id
- * @param sseMessageDTO 要发送的消息内容
+ * @param pushDTO 要发送的消息内容
*/
- public void sendMessage(Long userId, SseMessageDTO sseMessageDTO) {
- sendMessage(userId, buildPayload(sseMessageDTO));
+ public void sendMessage(Long userId, PushDTO pushDTO) {
+ if (pushDTO == null) {
+ return;
+ }
+ sendMessage(userId, pushDTO.getPayload());
}
/**
@@ -226,6 +227,7 @@ public class SseEmitterManager {
*
* @param payload 要发送的消息体
*/
+ @Override
public void sendMessage(PushPayload payload) {
sendMessage(JsonUtils.toJsonString(payload));
}
@@ -233,16 +235,16 @@ public class SseEmitterManager {
/**
* 发布 SSE 订阅消息。
*
- * @param sseMessageDTO 要发布的SSE消息对象
+ * @param pushDTO 要发布的SSE消息对象
*/
- public void publishMessage(SseMessageDTO sseMessageDTO) {
- SseMessageDTO broadcastMessage = new SseMessageDTO();
- broadcastMessage.setUserIds(sseMessageDTO.getUserIds());
- broadcastMessage.setMessage(sseMessageDTO.getMessage());
- RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
- log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
- SSE_TOPIC, sseMessageDTO.getUserIds(), sseMessageDTO.getMessage());
- });
+ @Override
+ public void publishMessage(PushDTO pushDTO) {
+ RedisUtils.publish(MessageConstants.MESSAGE_TOPIC, pushDTO, consumer -> log.info(
+ "发送主题订阅消息topic:{} userIds:{} message:{}",
+ MessageConstants.MESSAGE_TOPIC,
+ pushDTO.getUserIds(),
+ pushDTO.getPayload() == null ? null : pushDTO.getPayload().getMessage()
+ ));
}
/**
@@ -251,11 +253,7 @@ public class SseEmitterManager {
* @param message 要发布的消息内容
*/
public void publishAll(String message) {
- SseMessageDTO broadcastMessage = new SseMessageDTO();
- broadcastMessage.setMessage(message);
- RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
- log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message);
- });
+ publishAll(PushPayload.of("message", "backend", message, null));
}
/**
@@ -263,29 +261,13 @@ public class SseEmitterManager {
*
* @param payload 要发布的消息体
*/
+ @Override
public void publishAll(PushPayload payload) {
- SseMessageDTO broadcastMessage = new SseMessageDTO();
- broadcastMessage.setMessage(payload.getMessage());
- broadcastMessage.setMessageType(payload.getType());
- broadcastMessage.setMessageSource(payload.getSource());
- broadcastMessage.setData(payload.getData());
- broadcastMessage.setPath(payload.getPath());
- broadcastMessage.setQuery(payload.getQuery());
- RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
- log.info("SSE发送主题订阅消息topic:{} type:{} source:{} message:{}",
- SSE_TOPIC, payload.getType(), payload.getSource(), payload.getMessage());
+ PushDTO dto = new PushDTO();
+ dto.setPayload(payload);
+ RedisUtils.publish(MessageConstants.MESSAGE_TOPIC, dto, consumer -> {
+ log.info("发送主题订阅消息topic:{} type:{} source:{} message:{}",
+ MessageConstants.MESSAGE_TOPIC, payload.getType(), payload.getSource(), payload.getMessage());
});
}
-
- private String buildPayload(SseMessageDTO sseMessageDTO) {
- PushPayload payload = PushPayload.of(
- sseMessageDTO.getMessageType(),
- sseMessageDTO.getMessageSource(),
- sseMessageDTO.getMessage(),
- sseMessageDTO.getData()
- );
- payload.setPath(sseMessageDTO.getPath());
- payload.setQuery(sseMessageDTO.getQuery());
- return JsonUtils.toJsonString(payload);
- }
}
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java
new file mode 100644
index 000000000..30f34df79
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java
@@ -0,0 +1,168 @@
+package org.dromara.common.push.core;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.map.MapUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.core.domain.dto.PushPayload;
+import org.dromara.common.core.utils.SpringUtils;
+import org.dromara.common.json.utils.JsonUtils;
+import org.dromara.common.push.dto.PushDTO;
+import org.dromara.common.redis.utils.RedisUtils;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.PongMessage;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import static org.dromara.common.push.constant.MessageConstants.MESSAGE_TOPIC;
+
+/**
+ * WebSocket 会话管理器。
+ *
+ * @author Lion Li
+ */
+@Slf4j
+public class WebSocketSessionManager implements PushSessionManager {
+
+ private static final Map> USER_TOKEN_SESSIONS = new ConcurrentHashMap<>();
+
+ public WebSocketSessionManager() {
+ SpringUtils.getBean(ScheduledExecutorService.class)
+ .scheduleWithFixedDelay(this::sessionMonitor, 60L, 60L, TimeUnit.SECONDS);
+ }
+
+ public void connect(Long userId, String token, WebSocketSession session) {
+ Map sessions = USER_TOKEN_SESSIONS.computeIfAbsent(userId, key -> new ConcurrentHashMap<>());
+ WebSocketSession oldSession = sessions.remove(token);
+ closeSession(oldSession, CloseStatus.NORMAL);
+ sessions.put(token, session);
+ }
+
+ public void disconnect(Long userId, String token) {
+ if (userId == null || token == null) {
+ return;
+ }
+ Map sessions = USER_TOKEN_SESSIONS.get(userId);
+ if (MapUtil.isEmpty(sessions)) {
+ USER_TOKEN_SESSIONS.remove(userId);
+ return;
+ }
+ closeSession(sessions.remove(token), CloseStatus.NORMAL);
+ if (sessions.isEmpty()) {
+ USER_TOKEN_SESSIONS.remove(userId);
+ }
+ }
+
+ public void sessionMonitor() {
+ List toRemoveUsers = new ArrayList<>();
+ USER_TOKEN_SESSIONS.forEach((userId, sessionMap) -> {
+ if (CollUtil.isEmpty(sessionMap)) {
+ toRemoveUsers.add(userId);
+ return;
+ }
+ sessionMap.entrySet().removeIf(entry -> {
+ WebSocketSession session = entry.getValue();
+ if (session == null || !session.isOpen()) {
+ closeSession(session, CloseStatus.NORMAL);
+ return true;
+ }
+ return false;
+ });
+ if (sessionMap.isEmpty()) {
+ toRemoveUsers.add(userId);
+ }
+ });
+ toRemoveUsers.forEach(USER_TOKEN_SESSIONS::remove);
+ }
+
+ @Override
+ public void subscribeMessage(Consumer consumer) {
+ RedisUtils.subscribe(MESSAGE_TOPIC, PushDTO.class, consumer);
+ }
+
+ @Override
+ public void sendMessage(Long userId, PushPayload payload) {
+ if (payload == null) {
+ return;
+ }
+ Map sessions = USER_TOKEN_SESSIONS.get(userId);
+ if (MapUtil.isEmpty(sessions)) {
+ USER_TOKEN_SESSIONS.remove(userId);
+ return;
+ }
+ sessions.entrySet().removeIf(entry -> {
+ WebSocketSession session = entry.getValue();
+ if (session == null || !session.isOpen()) {
+ closeSession(session, CloseStatus.NORMAL);
+ return true;
+ }
+ return !sendMessage(session, new TextMessage(JsonUtils.toJsonString(payload)));
+ });
+ if (sessions.isEmpty()) {
+ USER_TOKEN_SESSIONS.remove(userId);
+ }
+ }
+
+ @Override
+ public void sendMessage(PushPayload payload) {
+ USER_TOKEN_SESSIONS.keySet().forEach(userId -> sendMessage(userId, payload));
+ }
+
+ @Override
+ public void publishMessage(PushDTO pushDTO) {
+ RedisUtils.publish(MESSAGE_TOPIC, pushDTO, consumer -> log.info(
+ "WebSocket发送主题订阅消息topic:{} userIds:{} message:{}",
+ MESSAGE_TOPIC,
+ pushDTO.getUserIds(),
+ pushDTO.getPayload() == null ? null : pushDTO.getPayload().getMessage()
+ ));
+ }
+
+ @Override
+ public void publishAll(PushPayload payload) {
+ PushDTO dto = new PushDTO();
+ dto.setPayload(payload);
+ publishMessage(dto);
+ }
+
+ public void sendPongMessage(WebSocketSession session) {
+ sendMessage(session, new PongMessage());
+ }
+
+ public void sendMessage(WebSocketSession session, String message) {
+ sendMessage(session, new TextMessage(message));
+ }
+
+ private boolean sendMessage(WebSocketSession session, WebSocketMessage> message) {
+ if (session == null || !session.isOpen()) {
+ log.warn("[send] session会话已经关闭");
+ return false;
+ }
+ try {
+ session.sendMessage(message);
+ return true;
+ } catch (IOException e) {
+ log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
+ return false;
+ }
+ }
+
+ private void closeSession(WebSocketSession session, CloseStatus status) {
+ if (session == null) {
+ return;
+ }
+ try {
+ session.close(status);
+ } catch (Exception ignored) {
+ }
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/dto/PushDTO.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/dto/PushDTO.java
new file mode 100644
index 000000000..8a31df0b5
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/dto/PushDTO.java
@@ -0,0 +1,30 @@
+package org.dromara.common.push.dto;
+
+import lombok.Data;
+import org.dromara.common.core.domain.dto.PushPayload;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * 统一推送 DTO。
+ *
+ * @author Lion Li
+ */
+@Data
+public class PushDTO implements Serializable {
+
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * 目标用户 ID 列表,为空表示广播。
+ */
+ private List userIds;
+
+ /**
+ * 推送消息体。
+ */
+ private PushPayload payload;
+}
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/enums/MessageTransportEnum.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/enums/MessageTransportEnum.java
new file mode 100644
index 000000000..ee5660e1b
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/enums/MessageTransportEnum.java
@@ -0,0 +1,32 @@
+package org.dromara.common.push.enums;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.util.Arrays;
+
+/**
+ * 消息推送传输方式。
+ *
+ * @author Lion Li
+ */
+@Getter
+@AllArgsConstructor
+public enum MessageTransportEnum {
+
+ SSE("sse"),
+ WEBSOCKET("websocket");
+
+ private final String code;
+
+ public boolean matches(String transport) {
+ return code.equalsIgnoreCase(transport);
+ }
+
+ public static MessageTransportEnum of(String transport) {
+ return Arrays.stream(values())
+ .filter(item -> item.matches(transport))
+ .findFirst()
+ .orElse(SSE);
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java
new file mode 100644
index 000000000..f704c9245
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java
@@ -0,0 +1,104 @@
+package org.dromara.common.push.handler;
+
+import cn.hutool.core.util.ObjectUtil;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.core.domain.dto.PushPayload;
+import org.dromara.common.core.domain.model.LoginUser;
+import org.dromara.common.core.enums.PushSourceEnum;
+import org.dromara.common.core.enums.PushTypeEnum;
+import org.dromara.common.push.constant.MessageConstants;
+import org.dromara.common.push.core.WebSocketSessionManager;
+import org.dromara.common.push.dto.PushDTO;
+import org.springframework.web.socket.BinaryMessage;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.PongMessage;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.AbstractWebSocketHandler;
+import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * WebSocket Handler。
+ *
+ * @author Lion Li
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class PlusWebSocketHandler extends AbstractWebSocketHandler {
+
+ private final WebSocketSessionManager webSocketSessionManager;
+
+ @Override
+ public void afterConnectionEstablished(WebSocketSession session) throws IOException {
+ LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY);
+ String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY);
+ if (ObjectUtil.hasNull(loginUser, token)) {
+ session.close(CloseStatus.BAD_DATA);
+ log.info("[connect] invalid token received. sessionId: {}", session.getId());
+ return;
+ }
+ webSocketSessionManager.connect(
+ loginUser.getUserId(),
+ token,
+ new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64_000)
+ );
+ log.info("[connect] sessionId: {}, userId:{}, token:{}", session.getId(), loginUser.getUserId(), token);
+ }
+
+ @Override
+ protected void handleTextMessage(WebSocketSession session, TextMessage message) {
+ LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY);
+ if (ObjectUtil.isNull(loginUser)) {
+ return;
+ }
+ if (MessageConstants.PING.equalsIgnoreCase(message.getPayload())) {
+ webSocketSessionManager.sendMessage(session, MessageConstants.PONG);
+ return;
+ }
+ PushDTO dto = new PushDTO();
+ dto.setUserIds(List.of(loginUser.getUserId()));
+ dto.setPayload(PushPayload.of(
+ PushTypeEnum.CUSTOM,
+ PushSourceEnum.CLIENT,
+ message.getPayload(),
+ null
+ ));
+ webSocketSessionManager.publishMessage(dto);
+ }
+
+ @Override
+ protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
+ super.handleBinaryMessage(session, message);
+ }
+
+ @Override
+ protected void handlePongMessage(WebSocketSession session, PongMessage message) {
+ webSocketSessionManager.sendPongMessage(session);
+ }
+
+ @Override
+ public void handleTransportError(WebSocketSession session, Throwable exception) {
+ log.error("[transport error] sessionId: {}, exception:{}", session.getId(), exception.getMessage());
+ }
+
+ @Override
+ public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
+ LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY);
+ String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY);
+ if (ObjectUtil.hasNull(loginUser, token)) {
+ log.info("[disconnect] invalid token received. sessionId: {}", session.getId());
+ return;
+ }
+ webSocketSessionManager.disconnect(loginUser.getUserId(), token);
+ log.info("[disconnect] sessionId: {}, userId:{}, token:{}", session.getId(), loginUser.getUserId(), token);
+ }
+
+ @Override
+ public boolean supportsPartialMessages() {
+ return false;
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java
new file mode 100644
index 000000000..dedf5e462
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java
@@ -0,0 +1,80 @@
+package org.dromara.common.push.helper;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.dromara.common.core.domain.dto.PushPayload;
+import org.dromara.common.core.enums.PushSourceEnum;
+import org.dromara.common.core.enums.PushTypeEnum;
+import org.dromara.common.core.utils.SpringUtils;
+import org.dromara.common.push.core.PushSessionManager;
+import org.dromara.common.push.dto.PushDTO;
+
+import java.util.List;
+
+/**
+ * 统一消息推送工具。
+ *
+ * @author Lion Li
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PushHelper {
+
+ public static void sendMessage(Long userId, String message) {
+ sendMessage(userId, buildMessage(message));
+ }
+
+ public static void sendMessage(String message) {
+ sendMessage(buildMessage(message));
+ }
+
+ public static void sendMessage(Long userId, PushPayload payload) {
+ if (!isEnabled()) {
+ return;
+ }
+ getSessionManager().sendMessage(userId, payload);
+ }
+
+ public static void sendMessage(PushPayload payload) {
+ if (!isEnabled()) {
+ return;
+ }
+ getSessionManager().sendMessage(payload);
+ }
+
+ public static void publishMessage(List userIds, PushPayload payload) {
+ PushDTO dto = new PushDTO();
+ dto.setUserIds(userIds);
+ dto.setPayload(payload);
+ publishMessage(dto);
+ }
+
+ public static void publishMessage(PushDTO dto) {
+ if (!isEnabled() || dto == null || dto.getPayload() == null) {
+ return;
+ }
+ getSessionManager().publishMessage(dto);
+ }
+
+ public static void publishAll(String message) {
+ publishAll(buildMessage(message));
+ }
+
+ public static void publishAll(PushPayload payload) {
+ if (!isEnabled()) {
+ return;
+ }
+ getSessionManager().publishAll(payload);
+ }
+
+ public static boolean isEnabled() {
+ return Boolean.TRUE.equals(SpringUtils.getProperty("message.enabled", Boolean.class, Boolean.TRUE));
+ }
+
+ private static PushSessionManager getSessionManager() {
+ return SpringUtils.getBean(PushSessionManager.class);
+ }
+
+ private static PushPayload buildMessage(String message) {
+ return PushPayload.of(PushTypeEnum.MESSAGE, PushSourceEnum.BACKEND, message, null);
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java
similarity index 53%
rename from ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java
rename to ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java
index c70e37729..a830be5f9 100644
--- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java
@@ -1,4 +1,4 @@
-package org.dromara.common.websocket.interceptor;
+package org.dromara.common.push.interceptor;
import cn.dev33.satoken.exception.NotLoginException;
import cn.dev33.satoken.stp.StpUtil;
@@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.model.LoginUser;
import org.dromara.common.core.utils.ServletUtils;
import org.dromara.common.core.utils.StringUtils;
+import org.dromara.common.push.constant.MessageConstants;
import org.dromara.common.satoken.utils.LoginHelper;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
@@ -13,45 +14,35 @@ import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
-
-import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
-
/**
- * WebSocket握手请求的拦截器
+ * WebSocket 握手拦截器。
*
- * @author zendwang
+ * @author Lion Li
*/
@Slf4j
public class PlusWebSocketInterceptor implements HandshakeInterceptor {
- /**
- * WebSocket握手之前执行的前置处理方法
- *
- * @param request WebSocket握手请求
- * @param response WebSocket握手响应
- * @param wsHandler WebSocket处理程序
- * @param attributes 与WebSocket会话关联的属性
- * @return 如果允许握手继续进行,则返回true;否则返回false
- */
@Override
- public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) {
+ public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
+ Map attributes) {
try {
- // 检查是否登录 是否有token
LoginUser loginUser = LoginHelper.getLoginUser();
+ String tokenValue = StpUtil.getTokenValue();
+ if (loginUser == null || StringUtils.isBlank(tokenValue)) {
+ return false;
+ }
- // 解决 ws 不走 mvc 拦截器问题(cloud 版本不受影响)
- // 检查 header 与 param 里的 clientid 与 token 里的是否一致
String headerCid = ServletUtils.getRequest().getHeader(LoginHelper.CLIENT_KEY);
String paramCid = ServletUtils.getParameter(LoginHelper.CLIENT_KEY);
String clientId = StpUtil.getExtra(LoginHelper.CLIENT_KEY).toString();
if (!StringUtils.equalsAny(clientId, headerCid, paramCid)) {
- // token 无效
throw NotLoginException.newInstance(StpUtil.getLoginType(),
"-100", "客户端ID与Token不匹配",
StpUtil.getTokenValue());
}
- attributes.put(LOGIN_USER_KEY, loginUser);
+ attributes.put(MessageConstants.LOGIN_USER_KEY, loginUser);
+ attributes.put(MessageConstants.LOGIN_TOKEN_KEY, tokenValue);
return true;
} catch (NotLoginException e) {
log.error("WebSocket 认证失败'{}',无法访问系统资源", e.getMessage());
@@ -59,17 +50,8 @@ public class PlusWebSocketInterceptor implements HandshakeInterceptor {
}
}
- /**
- * WebSocket握手成功后执行的后置处理方法
- *
- * @param request WebSocket握手请求
- * @param response WebSocket握手响应
- * @param wsHandler WebSocket处理程序
- * @param exception 握手过程中可能出现的异常
- */
@Override
- public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
- // 在这个方法中可以执行一些握手成功后的后续处理逻辑,比如记录日志或者其他操作
+ public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
+ Exception exception) {
}
-
}
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java
new file mode 100644
index 000000000..4f1bbad7a
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java
@@ -0,0 +1,44 @@
+package org.dromara.common.push.listener;
+
+import cn.hutool.core.collection.CollUtil;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.push.core.PushSessionManager;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.Ordered;
+
+/**
+ * 统一消息主题订阅监听器。
+ *
+ * @author Lion Li
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class MessageTopicListener implements ApplicationRunner, Ordered {
+
+ private final PushSessionManager pushSessionManager;
+
+ @Override
+ public void run(ApplicationArguments args) {
+ pushSessionManager.subscribeMessage(message -> {
+ log.info("消息主题订阅收到消息userIds={} message={}",
+ message.getUserIds(),
+ message.getPayload() == null ? null : message.getPayload().getMessage());
+ if (message.getPayload() == null) {
+ return;
+ }
+ if (CollUtil.isNotEmpty(message.getUserIds())) {
+ message.getUserIds().forEach(userId -> pushSessionManager.sendMessage(userId, message.getPayload()));
+ } else {
+ pushSessionManager.sendMessage(message.getPayload());
+ }
+ });
+ log.info("初始化消息主题订阅监听器成功");
+ }
+
+ @Override
+ public int getOrder() {
+ return -1;
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/properties/MessageProperties.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/properties/MessageProperties.java
new file mode 100644
index 000000000..f444123ef
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/properties/MessageProperties.java
@@ -0,0 +1,35 @@
+package org.dromara.common.push.properties;
+
+import lombok.Data;
+import org.dromara.common.push.enums.MessageTransportEnum;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * 统一消息推送配置。
+ *
+ * @author Lion Li
+ */
+@Data
+@ConfigurationProperties("message")
+public class MessageProperties {
+
+ /**
+ * 是否启用消息推送。
+ */
+ private Boolean enabled = true;
+
+ /**
+ * 传输方式:sse / websocket。
+ */
+ private String transport = MessageTransportEnum.SSE.getCode();
+
+ /**
+ * 统一访问路径。
+ */
+ private String path = "/resource/message";
+
+ /**
+ * WebSocket 允许的跨域来源。
+ */
+ private String allowedOrigins = "*";
+}
diff --git a/ruoyi-common/ruoyi-common-push/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-push/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..2e02f912d
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-push/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1,3 @@
+org.dromara.common.push.config.MessageAutoConfiguration
+org.dromara.common.push.config.MessageSseConfiguration
+org.dromara.common.push.config.MessageWebSocketConfiguration
diff --git a/ruoyi-common/ruoyi-common-security/src/main/java/org/dromara/common/security/config/SecurityConfig.java b/ruoyi-common/ruoyi-common-security/src/main/java/org/dromara/common/security/config/SecurityConfig.java
index e94f4dfc1..eee99c74c 100644
--- a/ruoyi-common/ruoyi-common-security/src/main/java/org/dromara/common/security/config/SecurityConfig.java
+++ b/ruoyi-common/ruoyi-common-security/src/main/java/org/dromara/common/security/config/SecurityConfig.java
@@ -39,8 +39,8 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
public class SecurityConfig implements WebMvcConfigurer {
private final SecurityProperties securityProperties;
- @Value("${sse.path}")
- private String ssePath;
+ @Value("${message.path:/resource/message}")
+ private String messagePath;
/**
* 注册 Sa-Token 路由拦截器并配置鉴权规则。
@@ -85,7 +85,7 @@ public class SecurityConfig implements WebMvcConfigurer {
})).addPathPatterns("/**")
// 排除不需要拦截的路径
.excludePathPatterns(securityProperties.getExcludes())
- .excludePathPatterns(ssePath);
+ .excludePathPatterns(messagePath);
}
/**
diff --git a/ruoyi-common/ruoyi-common-sse/pom.xml b/ruoyi-common/ruoyi-common-sse/pom.xml
deleted file mode 100644
index ae44c988e..000000000
--- a/ruoyi-common/ruoyi-common-sse/pom.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-
-
-
- org.dromara
- ruoyi-common
- ${revision}
-
- 4.0.0
-
- ruoyi-common-sse
-
-
- ruoyi-common-sse 模块
-
-
-
-
- org.dromara
- ruoyi-common-core
-
-
- org.dromara
- ruoyi-common-redis
-
-
- org.dromara
- ruoyi-common-satoken
-
-
- org.dromara
- ruoyi-common-json
-
-
-
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java
deleted file mode 100644
index 0cf8054ed..000000000
--- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.dromara.common.sse.config;
-
-import org.dromara.common.sse.controller.SseController;
-import org.dromara.common.sse.core.SseEmitterManager;
-import org.dromara.common.sse.listener.SseTopicListener;
-import org.springframework.boot.autoconfigure.AutoConfiguration;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-
-/**
- * SSE 自动装配
- *
- * @author Lion Li
- */
-@AutoConfiguration
-@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
-@EnableConfigurationProperties(SseProperties.class)
-public class SseAutoConfiguration {
-
- @Bean
- public SseEmitterManager sseEmitterManager() {
- return new SseEmitterManager();
- }
-
- @Bean
- public SseTopicListener sseTopicListener() {
- return new SseTopicListener();
- }
-
- @Bean
- public SseController sseController(SseEmitterManager sseEmitterManager) {
- return new SseController(sseEmitterManager);
- }
-
-}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
deleted file mode 100644
index ce4e1732d..000000000
--- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.dromara.common.sse.config;
-
-import lombok.Data;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-/**
- * SSE 配置项
- *
- * @author Lion Li
- */
-@Data
-@ConfigurationProperties("sse")
-public class SseProperties {
-
- private Boolean enabled;
-
- /**
- * 路径
- */
- private String path;
-}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDTO.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDTO.java
deleted file mode 100644
index d0fb22fc3..000000000
--- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDTO.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package org.dromara.common.sse.dto;
-
-import lombok.Data;
-
-import java.io.Serial;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-/**
- * 消息的dto
- *
- * @author zendwang
- */
-@Data
-public class SseMessageDTO implements Serializable {
-
- @Serial
- private static final long serialVersionUID = 1L;
-
- /**
- * 需要推送到的session key 列表
- */
- private List userIds;
-
- /**
- * 需要发送的消息
- */
- private String message;
-
- /**
- * 消息类型
- */
- private String messageType;
-
- /**
- * 消息来源
- */
- private String messageSource;
-
- /**
- * 扩展数据
- */
- private Object data;
-
- /**
- * 前端跳转路径
- */
- private String path;
-
- /**
- * 前端跳转参数
- */
- private Map query;
-}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java
deleted file mode 100644
index 87dc02e70..000000000
--- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package org.dromara.common.sse.listener;
-
-import cn.hutool.core.collection.CollUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.dromara.common.sse.core.SseEmitterManager;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.core.Ordered;
-
-/**
- * SSE 主题订阅监听器
- *
- * @author Lion Li
- */
-@Slf4j
-public class SseTopicListener implements ApplicationRunner, Ordered {
-
- @Autowired
- private SseEmitterManager sseEmitterManager;
-
- /**
- * 在Spring Boot应用程序启动时初始化SSE主题订阅监听器
- *
- * @param args 应用程序参数
- * @throws Exception 初始化过程中可能抛出的异常
- */
- @Override
- public void run(ApplicationArguments args) throws Exception {
- sseEmitterManager.subscribeMessage((message) -> {
- log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage());
- // 如果key不为空就按照key发消息 如果为空就群发
- if (CollUtil.isNotEmpty(message.getUserIds())) {
- message.getUserIds().forEach(key -> {
- sseEmitterManager.sendMessage(key, message);
- });
- } else {
- sseEmitterManager.sendMessage(
- org.dromara.common.core.domain.dto.PushPayload.of(
- message.getMessageType(),
- message.getMessageSource(),
- message.getMessage(),
- message.getData()
- )
- );
- }
- });
- log.info("初始化SSE主题订阅监听器成功");
- }
-
- @Override
- public int getOrder() {
- return -1;
- }
-}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java
deleted file mode 100644
index 3dd613b30..000000000
--- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package org.dromara.common.sse.utils;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.dromara.common.core.domain.dto.PushPayload;
-import org.dromara.common.core.enums.PushSourceEnum;
-import org.dromara.common.core.enums.PushTypeEnum;
-import org.dromara.common.core.utils.SpringUtils;
-import org.dromara.common.sse.core.SseEmitterManager;
-import org.dromara.common.sse.dto.SseMessageDTO;
-
-import java.util.List;
-
-/**
- * SSE工具类
- *
- * @author Lion Li
- */
-@Slf4j
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class SseMessageUtils {
-
- private final static Boolean SSE_ENABLE = SpringUtils.getProperty("sse.enabled", Boolean.class, true);
- private static SseEmitterManager MANAGER;
-
- static {
- if (isEnable() && MANAGER == null) {
- MANAGER = SpringUtils.getBean(SseEmitterManager.class);
- }
- }
-
- /**
- * 向指定用户的 SSE 会话发送消息。
- *
- * @param userId 要发送消息的用户id
- * @param message 要发送的消息内容
- */
- public static void sendMessage(Long userId, String message) {
- if (!isEnable()) {
- return;
- }
- MANAGER.sendMessage(userId, buildMessage(message));
- }
-
- /**
- * 向当前节点上的所有 SSE 会话发送消息。
- *
- * @param message 要发送的消息内容
- */
- public static void sendMessage(String message) {
- if (!isEnable()) {
- return;
- }
- MANAGER.sendMessage(buildMessage(message));
- }
-
- /**
- * 向指定用户的 SSE 会话发送统一 JSON 消息。
- *
- * @param userId 要发送消息的用户id
- * @param payload 要发送的消息体
- */
- public static void sendMessage(Long userId, PushPayload payload) {
- if (!isEnable()) {
- return;
- }
- MANAGER.sendMessage(userId, payload);
- }
-
- /**
- * 向当前节点上的所有 SSE 会话发送统一 JSON 消息。
- *
- * @param payload 要发送的消息体
- */
- public static void sendMessage(PushPayload payload) {
- if (!isEnable()) {
- return;
- }
- MANAGER.sendMessage(payload);
- }
-
- /**
- * 发布 SSE 订阅消息。
- *
- * @param sseMessageDTO 要发布的SSE消息对象
- */
- public static void publishMessage(SseMessageDTO sseMessageDTO) {
- if (!isEnable()) {
- return;
- }
- MANAGER.publishMessage(sseMessageDTO);
- }
-
- /**
- * 向所有用户发布 SSE 广播消息。
- *
- * @param message 要发布的消息内容
- */
- public static void publishAll(String message) {
- if (!isEnable()) {
- return;
- }
- MANAGER.publishAll(buildMessage(message));
- }
-
- /**
- * 向指定用户发布统一 JSON 消息。
- *
- * @param userIds 目标用户
- * @param payload 消息体
- */
- public static void publishMessage(List userIds, PushPayload payload) {
- if (!isEnable()) {
- return;
- }
- SseMessageDTO dto = new SseMessageDTO();
- dto.setUserIds(userIds);
- dto.setMessage(payload.getMessage());
- dto.setMessageType(payload.getType());
- dto.setMessageSource(payload.getSource());
- dto.setData(payload.getData());
- dto.setPath(payload.getPath());
- dto.setQuery(payload.getQuery());
- MANAGER.publishMessage(dto);
- }
-
- /**
- * 向所有用户发布统一 JSON 消息。
- *
- * @param payload 消息体
- */
- public static void publishAll(PushPayload payload) {
- if (!isEnable()) {
- return;
- }
- MANAGER.publishAll(payload);
- }
-
- /**
- * 判断 SSE 功能是否启用。
- *
- * @return 是否启用
- */
- public static Boolean isEnable() {
- return SSE_ENABLE;
- }
-
- private static PushPayload buildMessage(String message) {
- return PushPayload.of(PushTypeEnum.MESSAGE, PushSourceEnum.BACKEND, message, null);
- }
-
-}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
deleted file mode 100644
index b80971390..000000000
--- a/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ /dev/null
@@ -1 +0,0 @@
-org.dromara.common.sse.config.SseAutoConfiguration
diff --git a/ruoyi-common/ruoyi-common-web/src/main/java/org/dromara/common/web/handler/GlobalExceptionHandler.java b/ruoyi-common/ruoyi-common-web/src/main/java/org/dromara/common/web/handler/GlobalExceptionHandler.java
index c59f0f2ac..d02089956 100644
--- a/ruoyi-common/ruoyi-common-web/src/main/java/org/dromara/common/web/handler/GlobalExceptionHandler.java
+++ b/ruoyi-common/ruoyi-common-web/src/main/java/org/dromara/common/web/handler/GlobalExceptionHandler.java
@@ -11,6 +11,7 @@ import org.dromara.common.core.domain.R;
import org.dromara.common.core.exception.ServiceException;
import org.dromara.common.core.exception.SseException;
import org.dromara.common.core.exception.base.BaseException;
+import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.core.utils.StreamUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.springframework.boot.json.JsonParseException;
@@ -129,7 +130,8 @@ public class GlobalExceptionHandler {
@ExceptionHandler(IOException.class)
public void handleIoException(IOException e, HttpServletRequest request) {
String requestURI = request.getRequestURI();
- if (requestURI.contains("sse")) {
+ String path = SpringUtils.getProperty("message.path");
+ if (requestURI.contains(path)) {
// sse 经常性连接中断 例如关闭浏览器 直接屏蔽
return;
}
diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java
deleted file mode 100644
index ef5cfc96f..000000000
--- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.dromara.common.websocket.config;
-
-import cn.hutool.core.util.StrUtil;
-import org.dromara.common.websocket.config.properties.WebSocketProperties;
-import org.dromara.common.websocket.handler.PlusWebSocketHandler;
-import org.dromara.common.websocket.interceptor.PlusWebSocketInterceptor;
-import org.dromara.common.websocket.listener.WebSocketTopicListener;
-import org.springframework.boot.autoconfigure.AutoConfiguration;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.web.socket.WebSocketHandler;
-import org.springframework.web.socket.config.annotation.EnableWebSocket;
-import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
-import org.springframework.web.socket.server.HandshakeInterceptor;
-
-/**
- * WebSocket 配置
- *
- * @author zendwang
- */
-@AutoConfiguration
-@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true")
-@EnableConfigurationProperties(WebSocketProperties.class)
-@EnableWebSocket
-public class WebSocketConfig {
-
- @Bean
- public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor,
- WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) {
- // 如果WebSocket的路径为空,则设置默认路径为 "/websocket"
- if (StrUtil.isBlank(webSocketProperties.getPath())) {
- webSocketProperties.setPath("/websocket");
- }
-
- // 如果允许跨域访问的地址为空,则设置为 "*",表示允许所有来源的跨域请求
- if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) {
- webSocketProperties.setAllowedOrigins("*");
- }
-
- // 返回一个WebSocketConfigurer对象,用于配置WebSocket
- return registry -> registry
- // 添加WebSocket处理程序和拦截器到指定路径,设置允许的跨域来源
- .addHandler(webSocketHandler, webSocketProperties.getPath())
- .addInterceptors(handshakeInterceptor)
- .setAllowedOrigins(webSocketProperties.getAllowedOrigins());
- }
-
- @Bean
- public HandshakeInterceptor handshakeInterceptor() {
- return new PlusWebSocketInterceptor();
- }
-
- @Bean
- public WebSocketHandler webSocketHandler() {
- return new PlusWebSocketHandler();
- }
-
- @Bean
- public WebSocketTopicListener topicListener() {
- return new WebSocketTopicListener();
- }
-}
diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java
deleted file mode 100644
index d629fe55a..000000000
--- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.dromara.common.websocket.config.properties;
-
-import lombok.Data;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-/**
- * WebSocket 配置项
- *
- * @author zendwang
- */
-@ConfigurationProperties("websocket")
-@Data
-public class WebSocketProperties {
-
- private Boolean enabled;
-
- /**
- * 路径
- */
- private String path;
-
- /**
- * 设置访问源地址
- */
- private String allowedOrigins;
-}
diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java
deleted file mode 100644
index e243279d9..000000000
--- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.dromara.common.websocket.constant;
-
-/**
- * websocket的常量配置
- *
- * @author zendwang
- */
-public interface WebSocketConstants {
-
- /**
- * websocketSession中的参数的key
- */
- String LOGIN_USER_KEY = "loginUser";
-
- /**
- * 订阅的频道
- */
- String WEB_SOCKET_TOPIC = "global:websocket";
-
- /**
- * 前端心跳检查的命令
- */
- String PING = "ping";
-
- /**
- * 服务端心跳恢复的字符串
- */
- String PONG = "pong";
-}
diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/dto/WebSocketMessageDTO.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/dto/WebSocketMessageDTO.java
deleted file mode 100644
index bbf9c66e6..000000000
--- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/dto/WebSocketMessageDTO.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package org.dromara.common.websocket.dto;
-
-import lombok.Data;
-
-import java.io.Serial;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-/**
- * 消息的dto
- *
- * @author zendwang
- */
-@Data
-public class WebSocketMessageDTO implements Serializable {
-
- @Serial
- private static final long serialVersionUID = 1L;
-
- /**
- * 需要推送到的session key 列表
- */
- private List sessionKeys;
-
- /**
- * 需要发送的消息
- */
- private String message;
-
- /**
- * 消息类型
- */
- private String messageType;
-
- /**
- * 消息来源
- */
- private String messageSource;
-
- /**
- * 扩展数据
- */
- private Object data;
-
- /**
- * 前端跳转路径
- */
- private String path;
-
- /**
- * 前端跳转参数
- */
- private Map query;
-}
diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java
deleted file mode 100644
index f841dd39c..000000000
--- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.dromara.common.websocket.handler;
-
-import cn.hutool.core.util.ObjectUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.dromara.common.core.enums.PushSourceEnum;
-import org.dromara.common.core.enums.PushTypeEnum;
-import org.dromara.common.core.domain.model.LoginUser;
-import org.dromara.common.websocket.dto.WebSocketMessageDTO;
-import org.dromara.common.websocket.holder.WebSocketSessionHolder;
-import org.dromara.common.websocket.utils.WebSocketUtils;
-import org.springframework.web.socket.*;
-import org.springframework.web.socket.handler.AbstractWebSocketHandler;
-import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
-
-import java.io.IOException;
-import java.util.List;
-
-import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
-
-/**
- * WebSocketHandler 实现类
- *
- * @author zendwang
- */
-@Slf4j
-public class PlusWebSocketHandler extends AbstractWebSocketHandler {
-
- /**
- * 连接成功后
- */
- @Override
- public void afterConnectionEstablished(WebSocketSession session) throws IOException {
- LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
- if (ObjectUtil.isNull(loginUser)) {
- session.close(CloseStatus.BAD_DATA);
- log.info("[connect] invalid token received. sessionId: {}", session.getId());
- return;
- }
- WebSocketSessionHolder.addSession(loginUser.getUserId(), new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64000));
- log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());
- }
-
- /**
- * 处理接收到的文本消息
- *
- * @param session WebSocket会话
- * @param message 接收到的文本消息
- * @throws Exception 处理消息过程中可能抛出的异常
- */
- @Override
- protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
- // 从WebSocket会话中获取登录用户信息
- LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
-
- // 创建WebSocket消息DTO对象
- WebSocketMessageDTO messageDTO = new WebSocketMessageDTO();
- messageDTO.setSessionKeys(List.of(loginUser.getUserId()));
- messageDTO.setMessage(message.getPayload());
- messageDTO.setMessageType(PushTypeEnum.CUSTOM.getType());
- messageDTO.setMessageSource(PushSourceEnum.CLIENT.getSource());
- WebSocketUtils.publishMessage(messageDTO);
- }
-
- /**
- * 处理接收到的二进制消息
- *
- * @param session WebSocket会话
- * @param message 接收到的二进制消息
- * @throws Exception 处理消息过程中可能抛出的异常
- */
- @Override
- protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
- super.handleBinaryMessage(session, message);
- }
-
- /**
- * 处理接收到的Pong消息(心跳监测)
- *
- * @param session WebSocket会话
- * @param message 接收到的Pong消息
- * @throws Exception 处理消息过程中可能抛出的异常
- */
- @Override
- protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
- WebSocketUtils.sendPongMessage(session);
- }
-
- /**
- * 处理WebSocket传输错误
- *
- * @param session WebSocket会话
- * @param exception 发生的异常
- * @throws Exception 处理过程中可能抛出的异常
- */
- @Override
- public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
- log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
- }
-
- /**
- * 在WebSocket连接关闭后执行清理操作
- *
- * @param session WebSocket会话
- * @param status 关闭状态信息
- */
- @Override
- public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
- LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
- if (ObjectUtil.isNull(loginUser)) {
- log.info("[disconnect] invalid token received. sessionId: {}", session.getId());
- return;
- }
- WebSocketSessionHolder.removeSession(loginUser.getUserId());
- log.info("[disconnect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());
- }
-
- /**
- * 指示处理程序是否支持接收部分消息
- *
- * @return 如果支持接收部分消息,则返回true;否则返回false
- */
- @Override
- public boolean supportsPartialMessages() {
- return false;
- }
-
-}
diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java
deleted file mode 100644
index 9c2372b85..000000000
--- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.dromara.common.websocket.holder;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.springframework.web.socket.CloseStatus;
-import org.springframework.web.socket.WebSocketSession;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * WebSocketSession 用于保存当前所有在线的会话信息
- *
- * @author zendwang
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class WebSocketSessionHolder {
-
- private static final Map USER_SESSION_MAP = new ConcurrentHashMap<>();
-
- /**
- * 将WebSocket会话添加到用户会话Map中
- *
- * @param sessionKey 会话键,用于检索会话
- * @param session 要添加的WebSocket会话
- */
- public static void addSession(Long sessionKey, WebSocketSession session) {
- removeSession(sessionKey);
- USER_SESSION_MAP.put(sessionKey, session);
- }
-
- /**
- * 从用户会话Map中移除指定会话键对应的WebSocket会话
- *
- * @param sessionKey 要移除的会话键
- */
- public static void removeSession(Long sessionKey) {
- WebSocketSession session = USER_SESSION_MAP.remove(sessionKey);
- try {
- session.close(CloseStatus.BAD_DATA);
- } catch (Exception ignored) {
- }
- }
-
- /**
- * 根据会话键从用户会话Map中获取WebSocket会话
- *
- * @param sessionKey 要获取的会话键
- * @return 与给定会话键对应的WebSocket会话,如果不存在则返回null
- */
- public static WebSocketSession getSessions(Long sessionKey) {
- return USER_SESSION_MAP.get(sessionKey);
- }
-
- /**
- * 获取存储在用户会话Map中所有WebSocket会话的会话键集合
- *
- * @return 所有WebSocket会话的会话键集合
- */
- public static Set getSessionsAll() {
- return USER_SESSION_MAP.keySet();
- }
-
- /**
- * 检查给定的会话键是否存在于用户会话Map中
- *
- * @param sessionKey 要检查的会话键
- * @return 如果存在对应的会话键,则返回true;否则返回false
- */
- public static Boolean existSession(Long sessionKey) {
- return USER_SESSION_MAP.containsKey(sessionKey);
- }
-}
diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java
deleted file mode 100644
index 002d99a16..000000000
--- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.dromara.common.websocket.listener;
-
-import cn.hutool.core.collection.CollUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.dromara.common.websocket.holder.WebSocketSessionHolder;
-import org.dromara.common.websocket.utils.WebSocketUtils;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.core.Ordered;
-
-/**
- * WebSocket 主题订阅监听器
- *
- * @author zendwang
- */
-@Slf4j
-public class WebSocketTopicListener implements ApplicationRunner, Ordered {
-
- /**
- * 在Spring Boot应用程序启动时初始化WebSocket主题订阅监听器
- *
- * @param args 应用程序参数
- * @throws Exception 初始化过程中可能抛出的异常
- */
- @Override
- public void run(ApplicationArguments args) throws Exception {
- // 订阅WebSocket消息
- WebSocketUtils.subscribeMessage((message) -> {
- log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage());
- // 如果key不为空就按照key发消息 如果为空就群发
- if (CollUtil.isNotEmpty(message.getSessionKeys())) {
- message.getSessionKeys().forEach(key -> {
- if (WebSocketSessionHolder.existSession(key)) {
- WebSocketUtils.sendMessage(key, message);
- }
- });
- } else {
- WebSocketSessionHolder.getSessionsAll().forEach(key -> {
- WebSocketUtils.sendMessage(key, message);
- });
- }
- });
- log.info("初始化WebSocket主题订阅监听器成功");
- }
-
- @Override
- public int getOrder() {
- return -1;
- }
-}
diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java
deleted file mode 100644
index 9d476fdd4..000000000
--- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java
+++ /dev/null
@@ -1,189 +0,0 @@
-package org.dromara.common.websocket.utils;
-
-import cn.hutool.core.collection.CollUtil;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.dromara.common.core.domain.dto.PushPayload;
-import org.dromara.common.json.utils.JsonUtils;
-import org.dromara.common.redis.utils.RedisUtils;
-import org.dromara.common.websocket.dto.WebSocketMessageDTO;
-import org.dromara.common.websocket.holder.WebSocketSessionHolder;
-import org.springframework.web.socket.PongMessage;
-import org.springframework.web.socket.TextMessage;
-import org.springframework.web.socket.WebSocketMessage;
-import org.springframework.web.socket.WebSocketSession;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Consumer;
-
-import static org.dromara.common.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC;
-
-/**
- * 工具类
- *
- * @author zendwang
- */
-@Slf4j
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class WebSocketUtils {
-
- /**
- * 向指定会话标识发送文本消息。
- *
- * @param sessionKey 要发送消息的用户id
- * @param message 要发送的消息内容
- */
- public static void sendMessage(Long sessionKey, String message) {
- WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
- sendMessage(session, message);
- }
-
- /**
- * 向指定会话标识发送统一 JSON 消息。
- *
- * @param sessionKey 要发送消息的用户id
- * @param webSocketMessage 要发送的消息内容
- */
- public static void sendMessage(Long sessionKey, WebSocketMessageDTO webSocketMessage) {
- WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
- sendMessage(session, buildPayload(webSocketMessage));
- }
-
- /**
- * 订阅 WebSocket 广播主题消息。
- *
- * @param consumer 处理WebSocket消息的消费者函数
- */
- public static void subscribeMessage(Consumer consumer) {
- RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDTO.class, consumer);
- }
-
- /**
- * 按会话标识发布 WebSocket 消息。
- *
- * @param webSocketMessage 要发布的WebSocket消息对象
- */
- public static void publishMessage(WebSocketMessageDTO webSocketMessage) {
- List unsentSessionKeys = new ArrayList<>();
- // 当前服务内session,直接发送消息
- for (Long sessionKey : webSocketMessage.getSessionKeys()) {
- if (WebSocketSessionHolder.existSession(sessionKey)) {
- WebSocketUtils.sendMessage(sessionKey, webSocketMessage);
- continue;
- }
- unsentSessionKeys.add(sessionKey);
- }
- // 不在当前服务内session,发布订阅消息
- if (CollUtil.isNotEmpty(unsentSessionKeys)) {
- WebSocketMessageDTO broadcastMessage = new WebSocketMessageDTO();
- broadcastMessage.setSessionKeys(unsentSessionKeys);
- broadcastMessage.setMessage(webSocketMessage.getMessage());
- RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
- log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}",
- WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
- });
- }
- }
-
- /**
- * 向所有 WebSocket 会话发布广播消息。
- *
- * @param message 要发布的消息内容
- */
- public static void publishAll(String message) {
- WebSocketMessageDTO broadcastMessage = new WebSocketMessageDTO();
- broadcastMessage.setMessage(message);
- RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
- log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
- });
- }
-
- /**
- * 向指定会话标识发布统一 JSON 消息。
- *
- * @param sessionKeys 目标会话
- * @param payload 消息体
- */
- public static void publishMessage(List sessionKeys, PushPayload payload) {
- WebSocketMessageDTO dto = new WebSocketMessageDTO();
- dto.setSessionKeys(sessionKeys);
- dto.setMessage(payload.getMessage());
- dto.setMessageType(payload.getType());
- dto.setMessageSource(payload.getSource());
- dto.setData(payload.getData());
- dto.setPath(payload.getPath());
- dto.setQuery(payload.getQuery());
- publishMessage(dto);
- }
-
- /**
- * 向所有 WebSocket 会话发布统一 JSON 消息。
- *
- * @param payload 消息体
- */
- public static void publishAll(PushPayload payload) {
- WebSocketMessageDTO broadcastMessage = new WebSocketMessageDTO();
- broadcastMessage.setMessage(payload.getMessage());
- broadcastMessage.setMessageType(payload.getType());
- broadcastMessage.setMessageSource(payload.getSource());
- broadcastMessage.setData(payload.getData());
- broadcastMessage.setPath(payload.getPath());
- broadcastMessage.setQuery(payload.getQuery());
- RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
- log.info("WebSocket发送主题订阅消息topic:{} type:{} source:{} message:{}",
- WEB_SOCKET_TOPIC, payload.getType(), payload.getSource(), payload.getMessage());
- });
- }
-
- /**
- * 向指定会话发送 Pong 心跳消息。
- *
- * @param session 要发送Pong消息的WebSocket会话
- */
- public static void sendPongMessage(WebSocketSession session) {
- sendMessage(session, new PongMessage());
- }
-
- /**
- * 向指定 WebSocket 会话发送文本消息。
- *
- * @param session WebSocket会话
- * @param message 要发送的文本消息内容
- */
- public static void sendMessage(WebSocketSession session, String message) {
- sendMessage(session, new TextMessage(message));
- }
-
- private static String buildPayload(WebSocketMessageDTO webSocketMessage) {
- PushPayload payload = PushPayload.of(
- webSocketMessage.getMessageType(),
- webSocketMessage.getMessageSource(),
- webSocketMessage.getMessage(),
- webSocketMessage.getData()
- );
- payload.setPath(webSocketMessage.getPath());
- payload.setQuery(webSocketMessage.getQuery());
- return JsonUtils.toJsonString(payload);
- }
-
- /**
- * 向指定 WebSocket 会话发送原始消息对象。
- *
- * @param session WebSocket会话
- * @param message 要发送的WebSocket消息对象
- */
- private static void sendMessage(WebSocketSession session, WebSocketMessage> message) {
- if (session == null || !session.isOpen()) {
- log.warn("[send] session会话已经关闭");
- } else {
- try {
- session.sendMessage(message);
- } catch (IOException e) {
- log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
- }
- }
- }
-}
diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
deleted file mode 100644
index c3a7305a3..000000000
--- a/ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ /dev/null
@@ -1 +0,0 @@
-org.dromara.common.websocket.config.WebSocketConfig
diff --git a/ruoyi-modules/ruoyi-demo/pom.xml b/ruoyi-modules/ruoyi-demo/pom.xml
index c61fb47a0..ec948ceee 100644
--- a/ruoyi-modules/ruoyi-demo/pom.xml
+++ b/ruoyi-modules/ruoyi-demo/pom.xml
@@ -85,7 +85,7 @@
org.dromara
- ruoyi-common-websocket
+ ruoyi-common-push
diff --git a/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/controller/WebSocketController.java b/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/controller/WebSocketController.java
index 6497ee5d7..92c738a71 100644
--- a/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/controller/WebSocketController.java
+++ b/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/controller/WebSocketController.java
@@ -1,14 +1,18 @@
package org.dromara.demo.controller;
import org.dromara.common.core.domain.R;
-import org.dromara.common.websocket.dto.WebSocketMessageDTO;
-import org.dromara.common.websocket.utils.WebSocketUtils;
+import org.dromara.common.core.domain.dto.PushPayload;
+import org.dromara.common.core.enums.PushSourceEnum;
+import org.dromara.common.core.enums.PushTypeEnum;
+import org.dromara.common.push.helper.PushHelper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import java.util.List;
+
/**
* WebSocket 演示案例
*
@@ -23,11 +27,22 @@ public class WebSocketController {
/**
* 发布消息
*
- * @param dto 发送内容
+ * @param userId 目标用户
+ * @param message 发送内容
*/
@GetMapping("/send")
- public R send(WebSocketMessageDTO dto) throws InterruptedException {
- WebSocketUtils.publishMessage(dto);
+ public R send(Long userId, String message) {
+ PushPayload payload = PushPayload.of(
+ PushTypeEnum.MESSAGE,
+ PushSourceEnum.BACKEND,
+ message,
+ null
+ );
+ if (userId == null) {
+ PushHelper.publishAll(payload);
+ } else {
+ PushHelper.publishMessage(List.of(userId), payload);
+ }
return R.ok("操作成功");
}
}
diff --git a/ruoyi-modules/ruoyi-system/pom.xml b/ruoyi-modules/ruoyi-system/pom.xml
index a24883dff..eaa6ab0c4 100644
--- a/ruoyi-modules/ruoyi-system/pom.xml
+++ b/ruoyi-modules/ruoyi-system/pom.xml
@@ -82,12 +82,7 @@
org.dromara
- ruoyi-common-websocket
-
-
-
- org.dromara
- ruoyi-common-sse
+ ruoyi-common-push
diff --git a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/controller/system/SysNoticeController.java b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/controller/system/SysNoticeController.java
index e7d8d7ecd..53ec668c4 100644
--- a/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/controller/system/SysNoticeController.java
+++ b/ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/controller/system/SysNoticeController.java
@@ -4,15 +4,15 @@ import cn.dev33.satoken.annotation.SaCheckPermission;
import lombok.RequiredArgsConstructor;
import org.dromara.common.core.domain.PageResult;
import org.dromara.common.core.domain.R;
+import org.dromara.common.core.domain.dto.PushPayload;
import org.dromara.common.core.enums.PushSourceEnum;
import org.dromara.common.core.enums.PushTypeEnum;
import org.dromara.common.core.service.DictService;
import org.dromara.common.log.annotation.Log;
import org.dromara.common.log.enums.BusinessType;
import org.dromara.common.mybatis.core.page.PageQuery;
+import org.dromara.common.push.helper.PushHelper;
import org.dromara.common.redis.annotation.RepeatSubmit;
-import org.dromara.common.sse.dto.SseMessageDTO;
-import org.dromara.common.sse.utils.SseMessageUtils;
import org.dromara.common.web.core.BaseController;
import org.dromara.system.domain.bo.SysNoticeBo;
import org.dromara.system.domain.vo.SysNoticeVo;
@@ -82,13 +82,14 @@ public class SysNoticeController extends BaseController {
data.put("noticeType", notice.getNoticeType());
data.put("noticeTypeLabel", type);
data.put("noticeTitle", notice.getNoticeTitle());
- SseMessageDTO dto = new SseMessageDTO();
- dto.setMessage("[" + type + "] " + notice.getNoticeTitle());
- dto.setMessageType(PushTypeEnum.NOTICE.getType());
- dto.setMessageSource(PushSourceEnum.NOTICE.getSource());
- dto.setData(data);
- dto.setPath("/system/notice");
- SseMessageUtils.publishMessage(dto);
+ PushHelper.publishAll(PushPayload.of(
+ PushTypeEnum.NOTICE,
+ PushSourceEnum.NOTICE,
+ "[" + type + "] " + notice.getNoticeTitle(),
+ data,
+ "/system/notice",
+ null
+ ));
return R.ok();
}
diff --git a/ruoyi-modules/ruoyi-workflow/pom.xml b/ruoyi-modules/ruoyi-workflow/pom.xml
index 9034f9688..40d6b072b 100644
--- a/ruoyi-modules/ruoyi-workflow/pom.xml
+++ b/ruoyi-modules/ruoyi-workflow/pom.xml
@@ -20,7 +20,7 @@
org.dromara
- ruoyi-common-sse
+ ruoyi-common-push
diff --git a/ruoyi-modules/ruoyi-workflow/src/main/java/org/dromara/workflow/service/impl/FlwCommonServiceImpl.java b/ruoyi-modules/ruoyi-workflow/src/main/java/org/dromara/workflow/service/impl/FlwCommonServiceImpl.java
index f0797d228..d10a05717 100644
--- a/ruoyi-modules/ruoyi-workflow/src/main/java/org/dromara/workflow/service/impl/FlwCommonServiceImpl.java
+++ b/ruoyi-modules/ruoyi-workflow/src/main/java/org/dromara/workflow/service/impl/FlwCommonServiceImpl.java
@@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.core.domain.dto.PushPayload;
import org.dromara.common.core.domain.dto.UserDTO;
import org.dromara.common.core.enums.PushSourceEnum;
import org.dromara.common.core.enums.PushTypeEnum;
@@ -12,8 +13,7 @@ import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.core.utils.StreamUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.mail.utils.MailUtils;
-import org.dromara.common.sse.dto.SseMessageDTO;
-import org.dromara.common.sse.utils.SseMessageUtils;
+import org.dromara.common.push.helper.PushHelper;
import org.dromara.warm.flow.core.FlowEngine;
import org.dromara.warm.flow.core.entity.Node;
import org.dromara.warm.flow.orm.entity.FlowTask;
@@ -94,12 +94,12 @@ public class FlwCommonServiceImpl implements IFlwCommonService {
try {
switch (messageTypeEnum) {
case SYSTEM_MESSAGE -> {
- SseMessageDTO dto = new SseMessageDTO();
- dto.setUserIds(userIds);
- dto.setMessage(message);
- dto.setMessageType(PushTypeEnum.MESSAGE.getType());
- dto.setMessageSource(PushSourceEnum.WORKFLOW.getSource());
- SseMessageUtils.publishMessage(dto);
+ PushHelper.publishMessage(userIds, PushPayload.of(
+ PushTypeEnum.MESSAGE,
+ PushSourceEnum.WORKFLOW,
+ message,
+ null
+ ));
}
case EMAIL_MESSAGE -> MailUtils.sendText(emails, subject, message);
case SMS_MESSAGE -> {