update 重构 common-sse 与 common-websocket 合并为 ruoyi-common-push 推送模块

This commit is contained in:
疯狂的狮子Li
2026-03-26 17:25:36 +08:00
parent 40011e9acd
commit 029f6a4c11
45 changed files with 775 additions and 1132 deletions

View File

@@ -0,0 +1,17 @@
package org.dromara.common.push.config;
import org.dromara.common.push.properties.MessageProperties;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
/**
* 统一消息推送公共自动装配。
*
* @author Lion Li
*/
@AutoConfiguration
@ConditionalOnProperty(prefix = "message", name = "enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(MessageProperties.class)
public class MessageAutoConfiguration {
}

View File

@@ -0,0 +1,33 @@
package org.dromara.common.push.config;
import org.dromara.common.push.controller.SseController;
import org.dromara.common.push.core.SseEmitterSessionManager;
import org.dromara.common.push.listener.MessageTopicListener;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
/**
* SSE 消息推送自动装配。
*
* @author Lion Li
*/
@AutoConfiguration(after = MessageAutoConfiguration.class)
@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "sse", matchIfMissing = true)
public class MessageSseConfiguration {
@Bean
public SseEmitterSessionManager sseEmitterManager() {
return new SseEmitterSessionManager();
}
@Bean
public MessageTopicListener messageTopicListener(SseEmitterSessionManager manager) {
return new MessageTopicListener(manager);
}
@Bean
public SseController sseController(SseEmitterSessionManager manager) {
return new SseController(manager);
}
}

View File

@@ -0,0 +1,55 @@
package org.dromara.common.push.config;
import org.dromara.common.push.listener.MessageTopicListener;
import org.dromara.common.push.core.WebSocketSessionManager;
import org.dromara.common.push.handler.PlusWebSocketHandler;
import org.dromara.common.push.interceptor.PlusWebSocketInterceptor;
import org.dromara.common.push.properties.MessageProperties;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.server.HandshakeInterceptor;
/**
* WebSocket 消息推送自动装配。
*
* @author Lion Li
*/
@EnableWebSocket
@AutoConfiguration(after = MessageAutoConfiguration.class)
@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "websocket")
public class MessageWebSocketConfiguration {
@Bean
public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor,
WebSocketHandler webSocketHandler,
MessageProperties messageProperties) {
return registry -> registry
.addHandler(webSocketHandler, messageProperties.getPath())
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins(messageProperties.getAllowedOrigins());
}
@Bean
public WebSocketSessionManager webSocketSessionManager() {
return new WebSocketSessionManager();
}
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new PlusWebSocketInterceptor();
}
@Bean
public WebSocketHandler webSocketHandler(WebSocketSessionManager webSocketSessionManager) {
return new PlusWebSocketHandler(webSocketSessionManager);
}
@Bean
public MessageTopicListener messageTopicListener(WebSocketSessionManager webSocketSessionManager) {
return new MessageTopicListener(webSocketSessionManager);
}
}

View File

@@ -0,0 +1,19 @@
package org.dromara.common.push.constant;
/**
* 模块通用消息常量定义。
*
* @author Lion Li
*/
public interface MessageConstants {
String LOGIN_USER_KEY = "loginUser";
String LOGIN_TOKEN_KEY = "token";
String MESSAGE_TOPIC = "global:message";
String PING = "ping";
String PONG = "pong";
}

View File

@@ -0,0 +1,92 @@
package org.dromara.common.push.controller;
import cn.dev33.satoken.annotation.SaIgnore;
import cn.dev33.satoken.stp.StpUtil;
import lombok.RequiredArgsConstructor;
import org.dromara.common.core.domain.R;
import org.dromara.common.push.core.SseEmitterSessionManager;
import org.dromara.common.satoken.utils.LoginHelper;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* SSE 控制器
*
* @author Lion Li
*/
@RestController
@RequiredArgsConstructor
public class SseController implements DisposableBean {
private final SseEmitterSessionManager sessionManager;
/**
* 建立当前登录用户的 SSE 连接。
*
* @return SSE 发射器
*/
@GetMapping(value = "${message.path:/resource/message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect() {
if (!StpUtil.isLogin()) {
return null;
}
String tokenValue = StpUtil.getTokenValue();
Long userId = LoginHelper.getUserId();
return sessionManager.connect(userId, tokenValue);
}
/**
* 关闭当前登录用户的 SSE 连接。
*
* @return 操作结果
*/
@SaIgnore
@GetMapping(value = "${message.path:/resource/message}/close")
public R<Void> close() {
String tokenValue = StpUtil.getTokenValue();
Long userId = LoginHelper.getUserId();
sessionManager.disconnect(userId, tokenValue);
return R.ok();
}
// 以下为demo仅供参考 禁止使用 请在业务逻辑中使用工具发送而不是用接口发送
// /**
// * 向特定用户发送消息
// *
// * @param userId 目标用户的 ID
// * @param msg 要发送的消息内容
// */
// @GetMapping(value = "${message.path:/resource/message}/send")
// public R<Void> send(Long userId, String msg) {
// PushDTO dto = new PushDTO();
// dto.setUserIds(List.of(userId));
// dto.setPayload(PushPayload.of("message", "backend", msg, null));
// sessionManager.publishMessage(dto);
// return R.ok();
// }
//
// /**
// * 向所有用户发送消息
// *
// * @param msg 要发送的消息内容
// */
// @GetMapping(value = "${message.path:/resource/message}/sendAll")
// public R<Void> send(String msg) {
// sessionManager.publishAll(msg);
// return R.ok();
// }
/**
* 容器销毁时释放资源占位实现。
*
* @throws Exception 销毁异常
*/
@Override
public void destroy() throws Exception {
// 销毁时不需要做什么 此方法避免无用操作报错
}
}

View File

@@ -0,0 +1,24 @@
package org.dromara.common.push.core;
import org.dromara.common.core.domain.dto.PushPayload;
import org.dromara.common.push.dto.PushDTO;
import java.util.function.Consumer;
/**
* 统一推送会话管理器。
*
* @author Lion Li
*/
public interface PushSessionManager {
void subscribeMessage(Consumer<PushDTO> consumer);
void sendMessage(Long userId, PushPayload payload);
void sendMessage(PushPayload payload);
void publishMessage(PushDTO pushDTO);
void publishAll(PushPayload payload);
}

View File

@@ -0,0 +1,273 @@
package org.dromara.common.push.core;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.dto.PushPayload;
import org.dromara.common.push.constant.MessageConstants;
import org.dromara.common.push.dto.PushDTO;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.redis.utils.RedisUtils;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* 管理 Server-Sent Events (SSE) 连接
*
* @author Lion Li
*/
@Slf4j
public class SseEmitterSessionManager implements PushSessionManager {
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
public SseEmitterSessionManager() {
// 定时执行 SSE 心跳检测
SpringUtils.getBean(ScheduledExecutorService.class)
.scheduleWithFixedDelay(this::sseMonitor, 60L, 60L, TimeUnit.SECONDS);
}
/**
* 建立与指定用户的 SSE 连接
*
* @param userId 用户的唯一标识符,用于区分不同用户的连接
* @param token 用户的唯一令牌,用于识别具体的连接
* @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件
*/
public SseEmitter connect(Long userId, String token) {
// 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表ConcurrentHashMap
// 每个用户可以有多个 SSE 连接,通过 token 进行区分
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
// 关闭已存在的SseEmitter防止超过最大连接数
SseEmitter oldEmitter = emitters.remove(token);
if (oldEmitter != null) {
oldEmitter.complete();
}
// 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞
SseEmitter emitter = new SseEmitter(86400000L);
emitters.put(token, emitter);
// 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token
emitter.onCompletion(() -> {
SseEmitter remove = emitters.remove(token);
if (remove != null) {
remove.complete();
}
});
emitter.onTimeout(() -> {
SseEmitter remove = emitters.remove(token);
if (remove != null) {
remove.complete();
}
});
emitter.onError((e) -> {
SseEmitter remove = emitters.remove(token);
if (remove != null) {
remove.complete();
}
});
try {
// 向客户端发送一条连接成功的事件
emitter.send(SseEmitter.event().comment("connected"));
} catch (IOException e) {
// 如果发送消息失败,则从映射表中移除 emitter
emitters.remove(token);
}
return emitter;
}
/**
* 断开指定用户的 SSE 连接
*
* @param userId 用户的唯一标识符,用于区分不同用户的连接
* @param token 用户的唯一令牌,用于识别具体的连接
*/
public void disconnect(Long userId, String token) {
if (userId == null || token == null) {
return;
}
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
if (MapUtil.isNotEmpty(emitters)) {
try {
SseEmitter sseEmitter = emitters.get(token);
sseEmitter.send(SseEmitter.event().comment("disconnected"));
sseEmitter.complete();
} catch (Exception ignore) {
}
emitters.remove(token);
} else {
USER_TOKEN_EMITTERS.remove(userId);
}
}
/**
* 执行 SSE 心跳检测并清理失效连接。
*/
public void sseMonitor() {
final SseEmitter.SseEventBuilder heartbeat = SseEmitter.event().comment("heartbeat");
// 记录需要移除的用户ID
List<Long> toRemoveUsers = new ArrayList<>();
USER_TOKEN_EMITTERS.forEach((userId, emitterMap) -> {
if (CollUtil.isEmpty(emitterMap)) {
toRemoveUsers.add(userId);
return;
}
emitterMap.entrySet().removeIf(entry -> {
try {
entry.getValue().send(heartbeat);
return false;
} catch (Exception ex) {
try {
entry.getValue().complete();
} catch (Exception ignore) {
// 忽略重复关闭异常
}
return true; // 发送失败 → 移除该连接
}
});
// 移除空连接用户
if (emitterMap.isEmpty()) {
toRemoveUsers.add(userId);
}
});
// 循环结束后统一清理空用户,避免并发修改异常
toRemoveUsers.forEach(USER_TOKEN_EMITTERS::remove);
}
/**
* 订阅 SSE 广播主题消息。
*
* @param consumer 处理SSE消息的消费者函数
*/
@Override
public void subscribeMessage(Consumer<PushDTO> consumer) {
RedisUtils.subscribe(MessageConstants.MESSAGE_TOPIC, PushDTO.class, consumer);
}
/**
* 向指定用户的全部本地 SSE 会话发送消息。
*
* @param userId 要发送消息的用户id
* @param message 要发送的消息内容
*/
public void sendMessage(Long userId, String message) {
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
if (MapUtil.isNotEmpty(emitters)) {
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
try {
entry.getValue().send(SseEmitter.event()
.name("message")
.data(message));
} catch (Exception e) {
SseEmitter remove = emitters.remove(entry.getKey());
if (remove != null) {
remove.complete();
}
}
}
} else {
USER_TOKEN_EMITTERS.remove(userId);
}
}
/**
* 向指定用户的全部本地 SSE 会话发送统一 JSON 消息。
*
* @param userId 要发送消息的用户id
* @param payload 要发送的消息体
*/
@Override
public void sendMessage(Long userId, PushPayload payload) {
sendMessage(userId, JsonUtils.toJsonString(payload));
}
/**
* 向指定用户的全部本地 SSE 会话发送统一 JSON 消息。
*
* @param userId 要发送消息的用户id
* @param pushDTO 要发送的消息内容
*/
public void sendMessage(Long userId, PushDTO pushDTO) {
if (pushDTO == null) {
return;
}
sendMessage(userId, pushDTO.getPayload());
}
/**
* 向当前节点所有 SSE 会话发送消息。
*
* @param message 要发送的消息内容
*/
public void sendMessage(String message) {
for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
sendMessage(userId, message);
}
}
/**
* 向当前节点所有 SSE 会话发送统一 JSON 消息。
*
* @param payload 要发送的消息体
*/
@Override
public void sendMessage(PushPayload payload) {
sendMessage(JsonUtils.toJsonString(payload));
}
/**
* 发布 SSE 订阅消息。
*
* @param pushDTO 要发布的SSE消息对象
*/
@Override
public void publishMessage(PushDTO pushDTO) {
RedisUtils.publish(MessageConstants.MESSAGE_TOPIC, pushDTO, consumer -> log.info(
"发送主题订阅消息topic:{} userIds:{} message:{}",
MessageConstants.MESSAGE_TOPIC,
pushDTO.getUserIds(),
pushDTO.getPayload() == null ? null : pushDTO.getPayload().getMessage()
));
}
/**
* 发布 SSE 广播消息。
*
* @param message 要发布的消息内容
*/
public void publishAll(String message) {
publishAll(PushPayload.of("message", "backend", message, null));
}
/**
* 发布 SSE 广播 JSON 消息。
*
* @param payload 要发布的消息体
*/
@Override
public void publishAll(PushPayload payload) {
PushDTO dto = new PushDTO();
dto.setPayload(payload);
RedisUtils.publish(MessageConstants.MESSAGE_TOPIC, dto, consumer -> {
log.info("发送主题订阅消息topic:{} type:{} source:{} message:{}",
MessageConstants.MESSAGE_TOPIC, payload.getType(), payload.getSource(), payload.getMessage());
});
}
}

View File

@@ -0,0 +1,168 @@
package org.dromara.common.push.core;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.dto.PushPayload;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.push.dto.PushDTO;
import org.dromara.common.redis.utils.RedisUtils;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.dromara.common.push.constant.MessageConstants.MESSAGE_TOPIC;
/**
* WebSocket 会话管理器。
*
* @author Lion Li
*/
@Slf4j
public class WebSocketSessionManager implements PushSessionManager {
private static final Map<Long, Map<String, WebSocketSession>> USER_TOKEN_SESSIONS = new ConcurrentHashMap<>();
public WebSocketSessionManager() {
SpringUtils.getBean(ScheduledExecutorService.class)
.scheduleWithFixedDelay(this::sessionMonitor, 60L, 60L, TimeUnit.SECONDS);
}
public void connect(Long userId, String token, WebSocketSession session) {
Map<String, WebSocketSession> sessions = USER_TOKEN_SESSIONS.computeIfAbsent(userId, key -> new ConcurrentHashMap<>());
WebSocketSession oldSession = sessions.remove(token);
closeSession(oldSession, CloseStatus.NORMAL);
sessions.put(token, session);
}
public void disconnect(Long userId, String token) {
if (userId == null || token == null) {
return;
}
Map<String, WebSocketSession> sessions = USER_TOKEN_SESSIONS.get(userId);
if (MapUtil.isEmpty(sessions)) {
USER_TOKEN_SESSIONS.remove(userId);
return;
}
closeSession(sessions.remove(token), CloseStatus.NORMAL);
if (sessions.isEmpty()) {
USER_TOKEN_SESSIONS.remove(userId);
}
}
public void sessionMonitor() {
List<Long> toRemoveUsers = new ArrayList<>();
USER_TOKEN_SESSIONS.forEach((userId, sessionMap) -> {
if (CollUtil.isEmpty(sessionMap)) {
toRemoveUsers.add(userId);
return;
}
sessionMap.entrySet().removeIf(entry -> {
WebSocketSession session = entry.getValue();
if (session == null || !session.isOpen()) {
closeSession(session, CloseStatus.NORMAL);
return true;
}
return false;
});
if (sessionMap.isEmpty()) {
toRemoveUsers.add(userId);
}
});
toRemoveUsers.forEach(USER_TOKEN_SESSIONS::remove);
}
@Override
public void subscribeMessage(Consumer<PushDTO> consumer) {
RedisUtils.subscribe(MESSAGE_TOPIC, PushDTO.class, consumer);
}
@Override
public void sendMessage(Long userId, PushPayload payload) {
if (payload == null) {
return;
}
Map<String, WebSocketSession> sessions = USER_TOKEN_SESSIONS.get(userId);
if (MapUtil.isEmpty(sessions)) {
USER_TOKEN_SESSIONS.remove(userId);
return;
}
sessions.entrySet().removeIf(entry -> {
WebSocketSession session = entry.getValue();
if (session == null || !session.isOpen()) {
closeSession(session, CloseStatus.NORMAL);
return true;
}
return !sendMessage(session, new TextMessage(JsonUtils.toJsonString(payload)));
});
if (sessions.isEmpty()) {
USER_TOKEN_SESSIONS.remove(userId);
}
}
@Override
public void sendMessage(PushPayload payload) {
USER_TOKEN_SESSIONS.keySet().forEach(userId -> sendMessage(userId, payload));
}
@Override
public void publishMessage(PushDTO pushDTO) {
RedisUtils.publish(MESSAGE_TOPIC, pushDTO, consumer -> log.info(
"WebSocket发送主题订阅消息topic:{} userIds:{} message:{}",
MESSAGE_TOPIC,
pushDTO.getUserIds(),
pushDTO.getPayload() == null ? null : pushDTO.getPayload().getMessage()
));
}
@Override
public void publishAll(PushPayload payload) {
PushDTO dto = new PushDTO();
dto.setPayload(payload);
publishMessage(dto);
}
public void sendPongMessage(WebSocketSession session) {
sendMessage(session, new PongMessage());
}
public void sendMessage(WebSocketSession session, String message) {
sendMessage(session, new TextMessage(message));
}
private boolean sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
if (session == null || !session.isOpen()) {
log.warn("[send] session会话已经关闭");
return false;
}
try {
session.sendMessage(message);
return true;
} catch (IOException e) {
log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
return false;
}
}
private void closeSession(WebSocketSession session, CloseStatus status) {
if (session == null) {
return;
}
try {
session.close(status);
} catch (Exception ignored) {
}
}
}

View File

@@ -0,0 +1,30 @@
package org.dromara.common.push.dto;
import lombok.Data;
import org.dromara.common.core.domain.dto.PushPayload;
import java.io.Serial;
import java.io.Serializable;
import java.util.List;
/**
* 统一推送 DTO。
*
* @author Lion Li
*/
@Data
public class PushDTO implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 目标用户 ID 列表,为空表示广播。
*/
private List<Long> userIds;
/**
* 推送消息体。
*/
private PushPayload payload;
}

View File

@@ -0,0 +1,32 @@
package org.dromara.common.push.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Arrays;
/**
* 消息推送传输方式。
*
* @author Lion Li
*/
@Getter
@AllArgsConstructor
public enum MessageTransportEnum {
SSE("sse"),
WEBSOCKET("websocket");
private final String code;
public boolean matches(String transport) {
return code.equalsIgnoreCase(transport);
}
public static MessageTransportEnum of(String transport) {
return Arrays.stream(values())
.filter(item -> item.matches(transport))
.findFirst()
.orElse(SSE);
}
}

View File

@@ -0,0 +1,104 @@
package org.dromara.common.push.handler;
import cn.hutool.core.util.ObjectUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.dto.PushPayload;
import org.dromara.common.core.domain.model.LoginUser;
import org.dromara.common.core.enums.PushSourceEnum;
import org.dromara.common.core.enums.PushTypeEnum;
import org.dromara.common.push.constant.MessageConstants;
import org.dromara.common.push.core.WebSocketSessionManager;
import org.dromara.common.push.dto.PushDTO;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
import java.io.IOException;
import java.util.List;
/**
* WebSocket Handler。
*
* @author Lion Li
*/
@RequiredArgsConstructor
@Slf4j
public class PlusWebSocketHandler extends AbstractWebSocketHandler {
private final WebSocketSessionManager webSocketSessionManager;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws IOException {
LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY);
String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY);
if (ObjectUtil.hasNull(loginUser, token)) {
session.close(CloseStatus.BAD_DATA);
log.info("[connect] invalid token received. sessionId: {}", session.getId());
return;
}
webSocketSessionManager.connect(
loginUser.getUserId(),
token,
new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64_000)
);
log.info("[connect] sessionId: {}, userId:{}, token:{}", session.getId(), loginUser.getUserId(), token);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY);
if (ObjectUtil.isNull(loginUser)) {
return;
}
if (MessageConstants.PING.equalsIgnoreCase(message.getPayload())) {
webSocketSessionManager.sendMessage(session, MessageConstants.PONG);
return;
}
PushDTO dto = new PushDTO();
dto.setUserIds(List.of(loginUser.getUserId()));
dto.setPayload(PushPayload.of(
PushTypeEnum.CUSTOM,
PushSourceEnum.CLIENT,
message.getPayload(),
null
));
webSocketSessionManager.publishMessage(dto);
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
super.handleBinaryMessage(session, message);
}
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) {
webSocketSessionManager.sendPongMessage(session);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
log.error("[transport error] sessionId: {}, exception:{}", session.getId(), exception.getMessage());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY);
String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY);
if (ObjectUtil.hasNull(loginUser, token)) {
log.info("[disconnect] invalid token received. sessionId: {}", session.getId());
return;
}
webSocketSessionManager.disconnect(loginUser.getUserId(), token);
log.info("[disconnect] sessionId: {}, userId:{}, token:{}", session.getId(), loginUser.getUserId(), token);
}
@Override
public boolean supportsPartialMessages() {
return false;
}
}

View File

@@ -0,0 +1,80 @@
package org.dromara.common.push.helper;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.dromara.common.core.domain.dto.PushPayload;
import org.dromara.common.core.enums.PushSourceEnum;
import org.dromara.common.core.enums.PushTypeEnum;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.push.core.PushSessionManager;
import org.dromara.common.push.dto.PushDTO;
import java.util.List;
/**
* 统一消息推送工具。
*
* @author Lion Li
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class PushHelper {
public static void sendMessage(Long userId, String message) {
sendMessage(userId, buildMessage(message));
}
public static void sendMessage(String message) {
sendMessage(buildMessage(message));
}
public static void sendMessage(Long userId, PushPayload payload) {
if (!isEnabled()) {
return;
}
getSessionManager().sendMessage(userId, payload);
}
public static void sendMessage(PushPayload payload) {
if (!isEnabled()) {
return;
}
getSessionManager().sendMessage(payload);
}
public static void publishMessage(List<Long> userIds, PushPayload payload) {
PushDTO dto = new PushDTO();
dto.setUserIds(userIds);
dto.setPayload(payload);
publishMessage(dto);
}
public static void publishMessage(PushDTO dto) {
if (!isEnabled() || dto == null || dto.getPayload() == null) {
return;
}
getSessionManager().publishMessage(dto);
}
public static void publishAll(String message) {
publishAll(buildMessage(message));
}
public static void publishAll(PushPayload payload) {
if (!isEnabled()) {
return;
}
getSessionManager().publishAll(payload);
}
public static boolean isEnabled() {
return Boolean.TRUE.equals(SpringUtils.getProperty("message.enabled", Boolean.class, Boolean.TRUE));
}
private static PushSessionManager getSessionManager() {
return SpringUtils.getBean(PushSessionManager.class);
}
private static PushPayload buildMessage(String message) {
return PushPayload.of(PushTypeEnum.MESSAGE, PushSourceEnum.BACKEND, message, null);
}
}

View File

@@ -0,0 +1,57 @@
package org.dromara.common.push.interceptor;
import cn.dev33.satoken.exception.NotLoginException;
import cn.dev33.satoken.stp.StpUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.model.LoginUser;
import org.dromara.common.core.utils.ServletUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.push.constant.MessageConstants;
import org.dromara.common.satoken.utils.LoginHelper;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* WebSocket 握手拦截器。
*
* @author Lion Li
*/
@Slf4j
public class PlusWebSocketInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) {
try {
LoginUser loginUser = LoginHelper.getLoginUser();
String tokenValue = StpUtil.getTokenValue();
if (loginUser == null || StringUtils.isBlank(tokenValue)) {
return false;
}
String headerCid = ServletUtils.getRequest().getHeader(LoginHelper.CLIENT_KEY);
String paramCid = ServletUtils.getParameter(LoginHelper.CLIENT_KEY);
String clientId = StpUtil.getExtra(LoginHelper.CLIENT_KEY).toString();
if (!StringUtils.equalsAny(clientId, headerCid, paramCid)) {
throw NotLoginException.newInstance(StpUtil.getLoginType(),
"-100", "客户端ID与Token不匹配",
StpUtil.getTokenValue());
}
attributes.put(MessageConstants.LOGIN_USER_KEY, loginUser);
attributes.put(MessageConstants.LOGIN_TOKEN_KEY, tokenValue);
return true;
} catch (NotLoginException e) {
log.error("WebSocket 认证失败'{}',无法访问系统资源", e.getMessage());
return false;
}
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
}
}

View File

@@ -0,0 +1,44 @@
package org.dromara.common.push.listener;
import cn.hutool.core.collection.CollUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.push.core.PushSessionManager;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
/**
* 统一消息主题订阅监听器。
*
* @author Lion Li
*/
@Slf4j
@RequiredArgsConstructor
public class MessageTopicListener implements ApplicationRunner, Ordered {
private final PushSessionManager pushSessionManager;
@Override
public void run(ApplicationArguments args) {
pushSessionManager.subscribeMessage(message -> {
log.info("消息主题订阅收到消息userIds={} message={}",
message.getUserIds(),
message.getPayload() == null ? null : message.getPayload().getMessage());
if (message.getPayload() == null) {
return;
}
if (CollUtil.isNotEmpty(message.getUserIds())) {
message.getUserIds().forEach(userId -> pushSessionManager.sendMessage(userId, message.getPayload()));
} else {
pushSessionManager.sendMessage(message.getPayload());
}
});
log.info("初始化消息主题订阅监听器成功");
}
@Override
public int getOrder() {
return -1;
}
}

View File

@@ -0,0 +1,35 @@
package org.dromara.common.push.properties;
import lombok.Data;
import org.dromara.common.push.enums.MessageTransportEnum;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* 统一消息推送配置。
*
* @author Lion Li
*/
@Data
@ConfigurationProperties("message")
public class MessageProperties {
/**
* 是否启用消息推送。
*/
private Boolean enabled = true;
/**
* 传输方式sse / websocket。
*/
private String transport = MessageTransportEnum.SSE.getCode();
/**
* 统一访问路径。
*/
private String path = "/resource/message";
/**
* WebSocket 允许的跨域来源。
*/
private String allowedOrigins = "*";
}

View File

@@ -0,0 +1,3 @@
org.dromara.common.push.config.MessageAutoConfiguration
org.dromara.common.push.config.MessageSseConfiguration
org.dromara.common.push.config.MessageWebSocketConfiguration