From 261d00131eee082b619e71c2a520a2a7f44320cf Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=96=AF=E7=8B=82=E7=9A=84=E7=8B=AE=E5=AD=90Li?=
<15040126243@163.com>
Date: Thu, 19 Mar 2026 10:14:08 +0800
Subject: [PATCH] =?UTF-8?q?add=20=E5=A2=9E=E5=8A=A0=20ruoyi-common-mqtt=20?=
=?UTF-8?q?=E6=A8=A1=E5=9D=97?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
pom.xml | 8 ++
ruoyi-common/pom.xml | 1 +
ruoyi-common/ruoyi-common-bom/pom.xml | 6 ++
ruoyi-common/ruoyi-common-mqtt/pom.xml | 34 +++++++++
.../mqtt/config/MqttAutoConfiguration.java | 73 ++++++++++++++++++
.../listener/MqttClientConnectListener.java | 37 +++++++++
.../MqttClientGlobalMessageListener.java | 23 ++++++
...ot.autoconfigure.AutoConfiguration.imports | 1 +
ruoyi-example/ruoyi-demo/pom.xml | 5 ++
.../demo/controller/MqttController.java | 76 +++++++++++++++++++
.../src/main/resources/application.yml | 40 ++++++++++
11 files changed, 304 insertions(+)
create mode 100644 ruoyi-common/ruoyi-common-mqtt/pom.xml
create mode 100644 ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java
create mode 100644 ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientConnectListener.java
create mode 100644 ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientGlobalMessageListener.java
create mode 100644 ruoyi-common/ruoyi-common-mqtt/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
create mode 100644 ruoyi-example/ruoyi-demo/src/main/java/org/dromara/demo/controller/MqttController.java
diff --git a/pom.xml b/pom.xml
index a74a5d754..1305a2722 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,8 @@
+ * 用法文档 ...
+ * 测试server搭建:
+ * 可执行下载其他mqtt服务端搭建
+ * 也可使用 mica自带的server搭建 ...
+ *
+ * @author Lion Li
+ */
+@AutoConfiguration
+@ConditionalOnProperty(value = "mqtt.client.enabled", havingValue = "true")
+public class MqttAutoConfiguration {
+
+ @Bean
+ public MqttClientConnectListener mqttClientConnectListener(MqttClientCreator mqttClientCreator) {
+ return new MqttClientConnectListener(mqttClientCreator);
+ }
+
+ @Bean
+ public MqttClientGlobalMessageListener mqttClientGlobalMessageListener() {
+ return new MqttClientGlobalMessageListener();
+ }
+
+ /**
+ * 客户端使用虚拟线程配置
+ */
+ @Bean
+ public MqttClientCustomizer mqttClientCustomizer() {
+ return creator -> {
+ // 这个数不重要 已经使用虚拟线程 就是填一下防止报错
+ int corePoolSize = ThreadUtils.CORE_POOL_SIZE;
+
+ ThreadFactory factory = new VirtualThreadTaskExecutor("tio-worker-virtual").getVirtualThreadFactory();
+ SynThreadPoolExecutor tioExecutor = new SynThreadPoolExecutor(corePoolSize, corePoolSize,
+ 0L, new LinkedBlockingQueue<>(), factory, new TioCallerRunsPolicy());
+ tioExecutor.prestartCoreThread();
+ creator.tioExecutor(tioExecutor);
+
+ ThreadFactory factory1 = new VirtualThreadTaskExecutor("tio-group-virtual").getVirtualThreadFactory();
+ ThreadPoolExecutor groupExecutor = new ThreadPoolExecutor(corePoolSize, corePoolSize,
+ 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory1, new TioCallerRunsPolicy());
+ groupExecutor.prestartCoreThread();
+ creator.groupExecutor(groupExecutor);
+
+ ThreadFactory factory2 = new VirtualThreadTaskExecutor("biz-worker-virtual").getVirtualThreadFactory();
+ ThreadPoolExecutor mqttExecutor = new ThreadPoolExecutor(corePoolSize, corePoolSize,
+ 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory2, new TioCallerRunsPolicy());
+ mqttExecutor.prestartCoreThread();
+ creator.mqttExecutor(mqttExecutor);
+ };
+ }
+
+}
diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientConnectListener.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientConnectListener.java
new file mode 100644
index 000000000..fd53eeabd
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientConnectListener.java
@@ -0,0 +1,37 @@
+package org.dromara.common.mqtt.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.mica.mqtt.core.client.IMqttClientConnectListener;
+import org.dromara.mica.mqtt.core.client.MqttClientCreator;
+import org.tio.core.ChannelContext;
+
+/**
+ * 客户端连接状态监听
+ *
+ * @author Lion Li
+ */
+@Slf4j
+public class MqttClientConnectListener implements IMqttClientConnectListener {
+ //
+ private final MqttClientCreator mqttClientCreator;
+
+ public MqttClientConnectListener(MqttClientCreator mqttClientCreator) {
+ this.mqttClientCreator = mqttClientCreator;
+ }
+
+ @Override
+ public void onConnected(ChannelContext context, boolean isReconnect) {
+ // 创建连接
+ log.info("MqttConnectedEvent:{}", context);
+ }
+
+ @Override
+ public void onDisconnect(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
+ // 离线时更新重连
+ log.info("MqttDisconnectEvent:{}", context, throwable);
+ // 在断线时更新 clientId、username、password
+// mqttClientCreator.clientId("newClient" + System.currentTimeMillis())
+// .username("newUserName")
+// .password("newPassword");
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientGlobalMessageListener.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientGlobalMessageListener.java
new file mode 100644
index 000000000..a3117eb74
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientGlobalMessageListener.java
@@ -0,0 +1,23 @@
+package org.dromara.common.mqtt.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
+import org.dromara.mica.mqtt.core.client.IMqttClientGlobalMessageListener;
+import org.tio.core.ChannelContext;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * 全局消息监听,可以监听到所有订阅消息
+ *
+ * @author Lion Li
+ */
+@Slf4j
+public class MqttClientGlobalMessageListener implements IMqttClientGlobalMessageListener {
+
+ @Override
+ public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
+ log.info("MqttGlobalMessageEvent => topic: {}, msg: {}", topic, new String(payload, StandardCharsets.UTF_8));
+ }
+
+}
diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-mqtt/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..265d0ddcc
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-mqtt/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.common.mqtt.config.MqttAutoConfiguration
diff --git a/ruoyi-example/ruoyi-demo/pom.xml b/ruoyi-example/ruoyi-demo/pom.xml
index c523db729..fe6429115 100644
--- a/ruoyi-example/ruoyi-demo/pom.xml
+++ b/ruoyi-example/ruoyi-demo/pom.xml
@@ -88,6 +88,11 @@
+ * 用法文档 ... + * 测试server搭建: + * 可执行下载其他mqtt服务端搭建 + * 也可使用 mica自带的server搭建 ... + * + * @author Lion Li + */ +@RequiredArgsConstructor +@RestController +@RequestMapping("/demo/mqtt") +@Slf4j +public class MqttController { + + @Lazy + @Autowired + private MqttClientTemplate client; + + @GetMapping("/send") + public boolean send() { + client.publish("/test/client", "测试测试".getBytes(StandardCharsets.UTF_8)); + return true; + } + + @MqttClientSubscribe("/test/#") + public void subQos0(String topic, byte[] payload) { + log.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8)); + } + + @MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.QOS1) + public void subQos1(String topic, byte[] payload) { + log.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8)); + } + + @MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register") + public void thingSubRegister(String topic, byte[] payload) { + // 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 + + // 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。 + log.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8)); + } + + @MqttClientSubscribe( + value = "/test/json", + deserialize = MqttJsonDeserializer.class // 2.4.5 开始支持 自定义序列化,默认 json 序列化 + ) + public void testJson(String topic, MqttPublishMessage message, TestDemo data) { + // 2.4.5 开始支持,支持 2 到 3 个参数,字段类型映射规则如下 + // String 字符串会默认映射到 topic, + // MqttPublishMessage 会默认映射到 原始的消息,可以拿到 mqtt5 的 props 参数 + // byte[] 会映射到 mqtt 消息内容 payload + // ByteBuffer 会映射到 mqtt 消息内容 payload + // 其他类型会走序列化,确保消息能够序列化,默认为 json 序列化 + log.info("topic:{} json data:{}", topic, data); + } + +} diff --git a/ruoyi-example/ruoyi-demo/src/main/resources/application.yml b/ruoyi-example/ruoyi-demo/src/main/resources/application.yml index 4313f433d..f30accfe1 100644 --- a/ruoyi-example/ruoyi-demo/src/main/resources/application.yml +++ b/ruoyi-example/ruoyi-demo/src/main/resources/application.yml @@ -116,3 +116,43 @@ easy-es: enable-track-total-hits: true # 数据刷新策略,默认为不刷新 refresh-policy: immediate + +--- # mqtt 配置 +# 具体配置还需查看文档 +# https://mica-mqtt.dreamlu.net/guide/spring/client.html +mqtt.client: + # 是否开启客户端,默认:true + enabled: false + # 连接的服务端 ip ,默认:127.0.0.1 + ip: 127.0.0.1 + # 端口:默认:1883 + port: 1883 + # 客户端名称 + name: Mqtt-Client + # 客户端Id(非常重要,一般为设备 sn,不可重复) + client-id: 000001 + username: ruoyi + password: 123456 + # 超时时间,单位:秒,默认:5秒 + timeout: 5 + # 重连时间,默认 5000 毫秒 + re-interval: 5000 + # mqtt 协议版本,可选 MQTT_3_1、mqtt_3_1_1、mqtt_5,默认:mqtt_3_1_1 + version: mqtt_3_1_1 + # 接收数据的 buffer size,默认:8k + read-buffer-size: 8KB + # 消息解析最大 bytes 长度,默认:10M + max-bytes-in-message: 10MB + # keep-alive 时间,单位:秒 + keep-alive-secs: 60 + # 开启保留 session 时,session 的有效期 + session-expiry-interval-secs: 0 + # 工作线程数,如果消息量比较大,例如做 emqx 的转发消息处理,可以调大此参数 + biz-thread-pool-size: 2 + # 是否开启 ssl + ssl: + enabled: false + keystore-path: + keystore-pass: + truststore-path: + truststore-pass: