diff --git a/pom.xml b/pom.xml index a74a5d754..1305a2722 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,8 @@ 8.7.3-20251210 1.8.4 + + 2.5.11 2.3.4 @@ -382,6 +384,12 @@ ${warm-flow.version} + + org.dromara.mica-mqtt + mica-mqtt-client-spring-boot-starter + ${mica-mqtt.version} + + diff --git a/ruoyi-common/pom.xml b/ruoyi-common/pom.xml index d862f76ea..7eb53dc53 100644 --- a/ruoyi-common/pom.xml +++ b/ruoyi-common/pom.xml @@ -43,6 +43,7 @@ ruoyi-common-nacos ruoyi-common-bus ruoyi-common-sse + ruoyi-common-mqtt ruoyi-common diff --git a/ruoyi-common/ruoyi-common-bom/pom.xml b/ruoyi-common/ruoyi-common-bom/pom.xml index 1eac811a0..8d76cedfc 100644 --- a/ruoyi-common/ruoyi-common-bom/pom.xml +++ b/ruoyi-common/ruoyi-common-bom/pom.xml @@ -244,6 +244,12 @@ ${revision} + + org.dromara + ruoyi-common-mqtt + ${revision} + + diff --git a/ruoyi-common/ruoyi-common-mqtt/pom.xml b/ruoyi-common/ruoyi-common-mqtt/pom.xml new file mode 100644 index 000000000..25a6d0679 --- /dev/null +++ b/ruoyi-common/ruoyi-common-mqtt/pom.xml @@ -0,0 +1,34 @@ + + + + org.dromara + ruoyi-common + ${revision} + + 4.0.0 + + ruoyi-common-mqtt + + + ruoyi-common-mqtt mqtt模块 + + + + + org.dromara + ruoyi-common-core + + + + org.dromara + ruoyi-common-json + + + + org.dromara.mica-mqtt + mica-mqtt-client-spring-boot-starter + + + diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java new file mode 100644 index 000000000..11f352988 --- /dev/null +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java @@ -0,0 +1,73 @@ +package org.dromara.common.mqtt.config; + +import org.dromara.common.mqtt.listener.MqttClientConnectListener; +import org.dromara.common.mqtt.listener.MqttClientGlobalMessageListener; +import org.dromara.mica.mqtt.core.client.MqttClientCreator; +import org.dromara.mica.mqtt.core.client.MqttClientCustomizer; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.core.task.VirtualThreadTaskExecutor; +import org.tio.utils.thread.ThreadUtils; +import org.tio.utils.thread.pool.SynThreadPoolExecutor; +import org.tio.utils.thread.pool.TioCallerRunsPolicy; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * mqtt客户端配置初始化 + *

+ * 用法文档 ... + * 测试server搭建: + * 可执行下载其他mqtt服务端搭建 + * 也可使用 mica自带的server搭建 ... + * + * @author Lion Li + */ +@AutoConfiguration +@ConditionalOnProperty(value = "mqtt.client.enabled", havingValue = "true") +public class MqttAutoConfiguration { + + @Bean + public MqttClientConnectListener mqttClientConnectListener(MqttClientCreator mqttClientCreator) { + return new MqttClientConnectListener(mqttClientCreator); + } + + @Bean + public MqttClientGlobalMessageListener mqttClientGlobalMessageListener() { + return new MqttClientGlobalMessageListener(); + } + + /** + * 客户端使用虚拟线程配置 + */ + @Bean + public MqttClientCustomizer mqttClientCustomizer() { + return creator -> { + // 这个数不重要 已经使用虚拟线程 就是填一下防止报错 + int corePoolSize = ThreadUtils.CORE_POOL_SIZE; + + ThreadFactory factory = new VirtualThreadTaskExecutor("tio-worker-virtual").getVirtualThreadFactory(); + SynThreadPoolExecutor tioExecutor = new SynThreadPoolExecutor(corePoolSize, corePoolSize, + 0L, new LinkedBlockingQueue<>(), factory, new TioCallerRunsPolicy()); + tioExecutor.prestartCoreThread(); + creator.tioExecutor(tioExecutor); + + ThreadFactory factory1 = new VirtualThreadTaskExecutor("tio-group-virtual").getVirtualThreadFactory(); + ThreadPoolExecutor groupExecutor = new ThreadPoolExecutor(corePoolSize, corePoolSize, + 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory1, new TioCallerRunsPolicy()); + groupExecutor.prestartCoreThread(); + creator.groupExecutor(groupExecutor); + + ThreadFactory factory2 = new VirtualThreadTaskExecutor("biz-worker-virtual").getVirtualThreadFactory(); + ThreadPoolExecutor mqttExecutor = new ThreadPoolExecutor(corePoolSize, corePoolSize, + 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory2, new TioCallerRunsPolicy()); + mqttExecutor.prestartCoreThread(); + creator.mqttExecutor(mqttExecutor); + }; + } + +} diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientConnectListener.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientConnectListener.java new file mode 100644 index 000000000..fd53eeabd --- /dev/null +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientConnectListener.java @@ -0,0 +1,37 @@ +package org.dromara.common.mqtt.listener; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.mica.mqtt.core.client.IMqttClientConnectListener; +import org.dromara.mica.mqtt.core.client.MqttClientCreator; +import org.tio.core.ChannelContext; + +/** + * 客户端连接状态监听 + * + * @author Lion Li + */ +@Slf4j +public class MqttClientConnectListener implements IMqttClientConnectListener { + // + private final MqttClientCreator mqttClientCreator; + + public MqttClientConnectListener(MqttClientCreator mqttClientCreator) { + this.mqttClientCreator = mqttClientCreator; + } + + @Override + public void onConnected(ChannelContext context, boolean isReconnect) { + // 创建连接 + log.info("MqttConnectedEvent:{}", context); + } + + @Override + public void onDisconnect(ChannelContext context, Throwable throwable, String remark, boolean isRemove) { + // 离线时更新重连 + log.info("MqttDisconnectEvent:{}", context, throwable); + // 在断线时更新 clientId、username、password +// mqttClientCreator.clientId("newClient" + System.currentTimeMillis()) +// .username("newUserName") +// .password("newPassword"); + } +} diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientGlobalMessageListener.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientGlobalMessageListener.java new file mode 100644 index 000000000..a3117eb74 --- /dev/null +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/listener/MqttClientGlobalMessageListener.java @@ -0,0 +1,23 @@ +package org.dromara.common.mqtt.listener; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.mica.mqtt.codec.message.MqttPublishMessage; +import org.dromara.mica.mqtt.core.client.IMqttClientGlobalMessageListener; +import org.tio.core.ChannelContext; + +import java.nio.charset.StandardCharsets; + +/** + * 全局消息监听,可以监听到所有订阅消息 + * + * @author Lion Li + */ +@Slf4j +public class MqttClientGlobalMessageListener implements IMqttClientGlobalMessageListener { + + @Override + public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) { + log.info("MqttGlobalMessageEvent => topic: {}, msg: {}", topic, new String(payload, StandardCharsets.UTF_8)); + } + +} diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-mqtt/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 000000000..265d0ddcc --- /dev/null +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.dromara.common.mqtt.config.MqttAutoConfiguration diff --git a/ruoyi-example/ruoyi-demo/pom.xml b/ruoyi-example/ruoyi-demo/pom.xml index c523db729..fe6429115 100644 --- a/ruoyi-example/ruoyi-demo/pom.xml +++ b/ruoyi-example/ruoyi-demo/pom.xml @@ -88,6 +88,11 @@ ruoyi-common-sensitive + + org.dromara + ruoyi-common-mqtt + + diff --git a/ruoyi-example/ruoyi-demo/src/main/java/org/dromara/demo/controller/MqttController.java b/ruoyi-example/ruoyi-demo/src/main/java/org/dromara/demo/controller/MqttController.java new file mode 100644 index 000000000..2e0e8c27a --- /dev/null +++ b/ruoyi-example/ruoyi-demo/src/main/java/org/dromara/demo/controller/MqttController.java @@ -0,0 +1,76 @@ +package org.dromara.demo.controller; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.demo.domain.TestDemo; +import org.dromara.mica.mqtt.codec.MqttQoS; +import org.dromara.mica.mqtt.codec.message.MqttPublishMessage; +import org.dromara.mica.mqtt.core.annotation.MqttClientSubscribe; +import org.dromara.mica.mqtt.core.deserialize.MqttJsonDeserializer; +import org.dromara.mica.mqtt.spring.client.MqttClientTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.nio.charset.StandardCharsets; + +/** + * mqtt 演示案例 + *

+ * 用法文档 ... + * 测试server搭建: + * 可执行下载其他mqtt服务端搭建 + * 也可使用 mica自带的server搭建 ... + * + * @author Lion Li + */ +@RequiredArgsConstructor +@RestController +@RequestMapping("/demo/mqtt") +@Slf4j +public class MqttController { + + @Lazy + @Autowired + private MqttClientTemplate client; + + @GetMapping("/send") + public boolean send() { + client.publish("/test/client", "测试测试".getBytes(StandardCharsets.UTF_8)); + return true; + } + + @MqttClientSubscribe("/test/#") + public void subQos0(String topic, byte[] payload) { + log.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8)); + } + + @MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.QOS1) + public void subQos1(String topic, byte[] payload) { + log.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8)); + } + + @MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register") + public void thingSubRegister(String topic, byte[] payload) { + // 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 + + // 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。 + log.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8)); + } + + @MqttClientSubscribe( + value = "/test/json", + deserialize = MqttJsonDeserializer.class // 2.4.5 开始支持 自定义序列化,默认 json 序列化 + ) + public void testJson(String topic, MqttPublishMessage message, TestDemo data) { + // 2.4.5 开始支持,支持 2 到 3 个参数,字段类型映射规则如下 + // String 字符串会默认映射到 topic, + // MqttPublishMessage 会默认映射到 原始的消息,可以拿到 mqtt5 的 props 参数 + // byte[] 会映射到 mqtt 消息内容 payload + // ByteBuffer 会映射到 mqtt 消息内容 payload + // 其他类型会走序列化,确保消息能够序列化,默认为 json 序列化 + log.info("topic:{} json data:{}", topic, data); + } + +} diff --git a/ruoyi-example/ruoyi-demo/src/main/resources/application.yml b/ruoyi-example/ruoyi-demo/src/main/resources/application.yml index 4313f433d..f30accfe1 100644 --- a/ruoyi-example/ruoyi-demo/src/main/resources/application.yml +++ b/ruoyi-example/ruoyi-demo/src/main/resources/application.yml @@ -116,3 +116,43 @@ easy-es: enable-track-total-hits: true # 数据刷新策略,默认为不刷新 refresh-policy: immediate + +--- # mqtt 配置 +# 具体配置还需查看文档 +# https://mica-mqtt.dreamlu.net/guide/spring/client.html +mqtt.client: + # 是否开启客户端,默认:true + enabled: false + # 连接的服务端 ip ,默认:127.0.0.1 + ip: 127.0.0.1 + # 端口:默认:1883 + port: 1883 + # 客户端名称 + name: Mqtt-Client + # 客户端Id(非常重要,一般为设备 sn,不可重复) + client-id: 000001 + username: ruoyi + password: 123456 + # 超时时间,单位:秒,默认:5秒 + timeout: 5 + # 重连时间,默认 5000 毫秒 + re-interval: 5000 + # mqtt 协议版本,可选 MQTT_3_1、mqtt_3_1_1、mqtt_5,默认:mqtt_3_1_1 + version: mqtt_3_1_1 + # 接收数据的 buffer size,默认:8k + read-buffer-size: 8KB + # 消息解析最大 bytes 长度,默认:10M + max-bytes-in-message: 10MB + # keep-alive 时间,单位:秒 + keep-alive-secs: 60 + # 开启保留 session 时,session 的有效期 + session-expiry-interval-secs: 0 + # 工作线程数,如果消息量比较大,例如做 emqx 的转发消息处理,可以调大此参数 + biz-thread-pool-size: 2 + # 是否开启 ssl + ssl: + enabled: false + keystore-path: + keystore-pass: + truststore-path: + truststore-pass: