!163 将spring-cloud-stream改为普通的mq依赖用法

* update: sky日志
* update: RocketMQ的集成方式
* feat:1. rabbit: 普通消息、延迟队列
This commit is contained in:
Xbhog
2024-06-03 14:50:47 +00:00
committed by 疯狂的狮子Li
parent 16ca219267
commit 0dac5a544f
30 changed files with 649 additions and 360 deletions

View File

@@ -0,0 +1,21 @@
package org.dromara.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
/**
* SpringBoot-MQ 案例项目
* @author Lion Li
*/
@SpringBootApplication
public class RuoYiTestMqApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(RuoYiTestMqApplication.class);
application.setApplicationStartup(new BufferingApplicationStartup(2048));
application.run(args);
System.out.println("(♥◠‿◠)ノ゙ MQ案例模块启动成功 ლ(´ڡ`ლ)゙ ");
}
}

View File

@@ -0,0 +1,54 @@
package org.dromara.stream.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author xbhog
*/
@Configuration
public class RabbitConfig {
public static final String EXCHANGE_NAME = "demo-exchange";
public static final String QUEUE_NAME = "demo-queue";
public static final String ROUTING_KEY = "demo.routing.key";
/**
* 创建交换机
* ExchangeBuilder有四种交换机模式
* Direct Exchange直连交换机根据Routing Key(路由键)进行投递到不同队列。
* Fanout Exchange扇形交换机采用广播模式根据绑定的交换机路由到与之对应的所有队列。
* Topic Exchange主题交换机对路由键进行模式匹配后进行投递符号#表示一个或多个词,*表示一个词。
* Header Exchange头交换机不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。
* durable 交换器是否持久化false 不持久化true 持久化)
**/
@Bean
public TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}
/**
* 创建队列
* durable 队列是否持久化 队列调用此方法就是持久化 可查看方法的源码
* deliveryMode 消息是否持久化1 不持久化2 持久化)
**/
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, false);
}
/**
* 绑定交换机和队列
* bing 方法参数可以是队列和交换机
* to 方法参数必须是交换机
* with 方法参数是路由Key 这里是以rabbit.开头
* noargs 就是不要参数的意思
* 这个方法的意思是把rabbit开头的消息 和 上面的队列 和 上面的交换机绑定
**/
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
}

View File

@@ -0,0 +1,69 @@
package org.dromara.stream.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitTTL队列
* @author xbhog
*/
@Configuration
public class RabbitTtlQueueConfig {
// 延迟队列名称
public static final String DELAY_QUEUE_NAME = "delay-queue";
// 延迟交换机名称
public static final String DELAY_EXCHANGE_NAME = "delay-exchange";
// 延迟路由键名称
public static final String DELAY_ROUTING_KEY = "delay.routing.key";
// 死信交换机名称
public static final String DEAD_LETTER_EXCHANGE = "dlx-exchange";
// 死信队列名称
public static final String DEAD_LETTER_QUEUE = "dlx-queue";
// 死信路由键名称
public static final String DEAD_LETTER_ROUTING_KEY = "dlx.routing.key";
// 延迟消息的默认 TTL毫秒
@Value("${rabbitmq.delay.ttl:5000}")
private long messageTTL;
// 声明延迟队列
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY)
.withArgument("x-message-ttl", messageTTL)
.build();
}
// 声明延迟交换机
@Bean
public TopicExchange delayExchange() {
return new TopicExchange(DELAY_EXCHANGE_NAME);
}
// 将延迟队列绑定到延迟交换机
@Bean
public Binding delayBinding(Queue delayQueue, TopicExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY);
}
// 声明死信队列
@Bean
public Queue deadLetterQueue() {
return new Queue(DEAD_LETTER_QUEUE);
}
// 声明死信交换机
@Bean
public TopicExchange deadLetterExchange() {
return new TopicExchange(DEAD_LETTER_EXCHANGE);
}
// 将死信队列绑定到死信交换机
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY);
}
}

View File

@@ -0,0 +1,72 @@
package org.dromara.stream.controller;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.dromara.stream.mq.producer.kafkaMq.KafkaNormalProducer;
import org.dromara.stream.mq.producer.rabbitMq.DelayRabbitProducer;
import org.dromara.stream.mq.producer.rabbitMq.NormalRabbitProducer;
import org.dromara.stream.mq.producer.rocketMq.NormalRocketProducer;
import org.dromara.stream.mq.producer.rocketMq.TransactionRocketProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author xbhog
*/
@Slf4j
@RestController
@RequestMapping("push/message")
public class PushMessageController {
@Resource
private NormalRabbitProducer normalRabbitProducer;
@Resource
private DelayRabbitProducer delayRabbitProducer;
@Resource
private NormalRocketProducer normalRocketProducer;
@Resource
private TransactionRocketProducer transactionRocketProducer;
@Resource
private KafkaNormalProducer normalKafkaProducer;
/**
* rabbit普通消息的处理
*/
@GetMapping("/rabbitMsg/sendNormal")
public void sendMq() {
normalRabbitProducer.sendMq("hello normal RabbitMsg");
}
/**
* rabbit延迟队列类型类似生产者
*/
@GetMapping("/rabbitMsg/sendDelay")
public void sendMessage() {
delayRabbitProducer.sendDelayMessage("Hello ttl RabbitMsg");
}
/**
* rockerMQ实例
* 需要手动创建相关的Topic和group
*/
@GetMapping("/rocketMq/send")
public void sendRockerMq(){
normalRocketProducer.sendMessage();
}
@GetMapping("/rocketMq/transactionMsg")
public void sendRockerMqTransactionMsg(){
transactionRocketProducer.sendTransactionMessage();
}
/**
* kafkaSpringboot集成
*/
@GetMapping("/kafkaMsg/send")
public void sendKafkaMsg(){
normalKafkaProducer.sendKafkaMsg();
}
}

View File

@@ -0,0 +1,24 @@
package org.dromara.stream.mq.consumer.kafkaMq;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author xbhog
* @date 2024/05/19 18:04
**/
@Slf4j
@Component
public class KafkaNormalConsumer {
//默认获取最后一条消息
@KafkaListener(topics = "test-topic",groupId = "demo")
public void timiKafka(ConsumerRecord record){
Object key = record.key();
Object value = record.value();
log.info("【消费者】received the message key {}value{}",key,value);
}
}

View File

@@ -0,0 +1,36 @@
package org.dromara.stream.mq.consumer.rabbit;
import lombok.extern.slf4j.Slf4j;
import org.dromara.stream.config.RabbitConfig;
import org.dromara.stream.config.RabbitTtlQueueConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author xbhog
* @date 2024年5月18日
*/
@Slf4j
@Component
public class ConsumerListener {
/**
* 设置监听哪一个队列 这个队列是RabbitConfig里面设置好的队列名字
* 普通消息
**/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void listenQueue(Message message) {
log.info("【消费者】Start consuming data{}",new String(message.getBody()));
}
/**
* 处理延迟队列的操作
* 该部分处理的延迟操作在消费上的时间可能与设置的TTl不同
* 一般会超长;原因是消息可能并不会按时死亡;可通过延迟队列插件处理
*/
@RabbitListener(queues = RabbitTtlQueueConfig.DEAD_LETTER_QUEUE)
public void receiveMessage(String message){
log.info("【消费者】Received delayed message{}",message);
}
}

View File

@@ -0,0 +1,20 @@
package org.dromara.stream.mq.consumer.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author xbhog
* @date 2024/06/01 16:53
**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "springboot-mq-consumer-1")
public class NormalRocketConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("【消费者】接收消息:{}" ,message);
}
}

View File

@@ -0,0 +1,21 @@
package org.dromara.stream.mq.consumer.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author xbhog
* @date 2024/06/01 16:54
**/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction_topic")
public class TransactionRocketConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("【消费者】===>接收事务消息:{}",message);
}
}

View File

@@ -0,0 +1,42 @@
package org.dromara.stream.mq.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* @author xbhog
* @date 2024/06/01 17:05
**/
@Slf4j
@Component
@RocketMQTransactionListener
public class TranscationRocketListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("执行本地事务");
String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));
if ("TAG-1".equals(tag)) {
//这里只讲TAGA消息提交状态为可执行
log.info("【监听器】这里是校验TAG-1;提交状态:COMMIT");
return RocketMQLocalTransactionState.COMMIT;
} else if ("TAG-2".equals(tag)) {
log.info("【监听器】这里是校验TAG-2;提交状态:ROLLBACK");
return RocketMQLocalTransactionState.ROLLBACK;
} else if ("TAG-3".equals(tag)) {
log.info("【监听器】这里是校验TAG-3;提交状态:UNKNOWN");
return RocketMQLocalTransactionState.UNKNOWN;
}
log.info("=========【监听器】提交状态:UNKNOWN");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
log.info("【监听器】检查本地交易===>{}", message);
return RocketMQLocalTransactionState.COMMIT;
}
}

View File

@@ -0,0 +1,24 @@
package org.dromara.stream.mq.producer.kafkaMq;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
/**
* @author xbhog
* @date 2024/05/19 18:02
**/
@Slf4j
@Component
public class KafkaNormalProducer {
@Resource
private KafkaTemplate kafkaTemplate;
public void sendKafkaMsg(){
CompletableFuture send = kafkaTemplate.send("test-topic","hello", "kafkaTest");
send.join();
}
}

View File

@@ -0,0 +1,25 @@
package org.dromara.stream.mq.producer.rabbitMq;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.dromara.stream.config.RabbitTtlQueueConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
/**
* @author xbhog
* @date 2024/05/25 17:15
**/
@Slf4j
@Component
public class DelayRabbitProducer {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendDelay")
public void sendDelayMessage(String message) {
rabbitTemplate.convertAndSend(RabbitTtlQueueConfig.DELAY_EXCHANGE_NAME, RabbitTtlQueueConfig.DELAY_ROUTING_KEY, message);
log.info("【生产者】Delayed message send: " + message);
}
}

View File

@@ -0,0 +1,24 @@
package org.dromara.stream.mq.producer.rabbitMq;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.dromara.stream.config.RabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* @author xbhog
*/
@Slf4j
@Component
public class NormalRabbitProducer {
@Resource
RabbitTemplate rabbitTemplate;
public void sendMq(String message) {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message);
log.info("【生产者】Message send: " + message);
}
}

View File

@@ -0,0 +1,25 @@
package org.dromara.stream.mq.producer.rocketMq;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @author xbhog
* @date 2024/06/01 16:49
**/
@Slf4j
@Component
public class NormalRocketProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(){
SendResult sendResult = rocketMQTemplate.syncSend("TestTopic", MessageBuilder.withPayload("hello world test").build());
log.info("发送普通同步消息-msgsyncSendMessage===>{}", sendResult);
}
}

View File

@@ -0,0 +1,41 @@
package org.dromara.stream.mq.producer.rocketMq;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* @author xbhog
* @date 2024/06/01 16:54
**/
@Slf4j
@Component
public class TransactionRocketProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendTransactionMessage(){
List<String> tags = Arrays.asList("TAG-1", "TAG-2", "TAG-3");
for (int i = 0; i < 3; i++) {
Message<String> message = MessageBuilder.withPayload("===>事务消息-" + i).build();
//destination formats: `topicName:tags` message message Message arg ext arg
TransactionSendResult res = rocketMQTemplate.sendMessageInTransaction("transaction_topic:" + tags.get(i), message, i + 1);
if (res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) {
log.info("【生产者】事物消息发送成功;成功结果:{}",res);
}else{
log.info("【生产者】事务发送失败:失败原因:{}",res);
}
}
}
}