当前位置: 首页 > news >正文

RocketMQ+Spring Boot的简单实现及其深入分析

Producer搭建

  1. 导入RocketMQ依赖和配置RocketMQ地址及producer的group:name
        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.4</version></dependency>

image1

  1. 创建消费接口

image2

1. 调用接口进行测试

image3

    ## 发送消息模式的类型扩展> `RocketTemplate`中有许多发送方法,其可应对大多数的场景> 

image4

    ### syncSend()> 同步发送,仅当发送过程完成时返回此方法.需严格保证顺序性,其会阻塞调用线程至到Broker获取响应> - 参数列表- `destination`目标主题,格式为`topicName:tags`tags可选- `payload`消息体,可以是任意对象,自动序列化- `message`Spring Message对象,可自定义headers- `timeout`发送超时时间毫秒,默认3000ms- 返回对象- SendResult:包含消息ID,发送状态,队列偏移值等等- 用于大部分对发送结果严格的场景:如电商,金融等等### asyncSend()> 异步发送,没有返回对象.异步传输一般用于响应时间敏感的业务场景.在发送完成会立即调用其参数列表中的sendCallBack方法> - 参数列表- `String destination`- `Message<?> message`- `SendCallback sendCallback`:发送结果调用方法- `onSuccess(SendResult result)`:发送成功回调- `onException(Throwable e)`:发送失败回调- 适用于:高吞吐,但对结果要求不高的场景如日志采集等等

image5

    ### syncSendOrderly()> 顺序发送> - 参数列表- `SendResult syncSendOrderly(String destination, Message<?> message, String hashKey);`- `SendResult syncSendOrderly(String destination, Object payload, String hashKey);`- `hasyKey`:分片见,相同的hashKey会被路由到同一个队列- 基本原理:`int queueId = Math.abs(hashKey.hashCode()) % queueCount;`# SendMesssageInTransaction()> 发送MQ分布式事务消息,其采用2PC(两端式协议)+补偿机制(事务回查)的分布式事务功能> - 半事务消息:暂不能投递的消息,消息生产者已经成功将消息发送到RocketMQ服务器中,但暂时为收到生产者对消息的二次确认.此时的消息会被标记为”暂不能投递”的状态.处于这种”暂不能投递”状态的消息被称为半事务消息- 消息回查:由于一些网络问题,生产者自身的问题等等,导致某条事务消息二次丢失,RocketMQ通过扫描某条消息长期处于”半事务消息”时,其会向生产者组询查该消息的最终状态(commit或Rollback),这就是消息回查- 在RocketMQ中发送食物消息需要三个核心组件1. 事务消息发送:使用sendMessageInTransaction()方法

image6

        2. 事务监听器:实现RocketMQLocalTransactionListener接口3. 事务监听注册:通过@RocketMQTransactionListener注解注册

image7

            - 返回对象:`TransactionSendResult`:含事务状态`LocalTransactionState`- 采用这一套事务消息发送逻辑,本地的Service只需关心发送消息的逻辑,其余的事务逻辑交由给事务监听器处理

image8

    ### 事务基本执行流程- **第一阶段:发送半事务**1. 生产者发送半事务消息:生产者将业务数据封装成数据,并将其发送给RocketMQ,此时消息被标记为”半事务消息”2. RocketMQ确认接收消息:RocketMQ接收到消息并将其持久化到存储系统中,此时会向生产者发送一个确认消息(Ack)表示该消息已经被接收3. 生产者执行本地事务逻辑:生产者接收到服务端的确认后,则开始本地业务逻辑执行.如更新数据库,修改订单等等- **第二阶段:提交或回滚事务**1. 生产者提交二次确认结果:根据本地事务执行结果,生产者向RocketMQ提交二次确认结果1. 若本地事务执行成功:生产者提交`Commit`操作,服务器端将半事务标记为:”可投递状态”,并将其投递给消费者2. 如果本地事务执行失败:生产者提交`Rollback`操作,RocketMQ则会回滚,不会将消息投递给消费者2. 但0由于网络问题生产者自身应用问题导致重启,RocketMQ迟迟未收到生产者的二次确认,或收到的消息结果为`Unknown`未知状态.RocketMQ会发起事务回查.1. RocketMQ会向生产者发送回查请求,要求查询其本地事务状态2. 生产者根据本地事务状态再次提交二次确认结果- **第三阶段:消费者进行消费**1. 当RocketMQ中的消息被标记为”可投递”之后,消息会被投递到消费者.消费者按其消费逻辑进行消费操作.最后向RocketMQ发送消费结果(成功/失败)2. 消息被消费后,RocketMQ会标记其消息为”已消费”,RocketMQ会默认保留所有消息.支持消费者回溯历史消息

image9

    ### 幂等问题> 幂等性,值对同一操作多次执行,结果与仅执行一次效果相同> - 出现幂等的原因:1. **生产者重复发送**:生产者客户端有可能因为某些网络问题导致发送失败,届时生产者会尝试发送相同的消息从而会导致RocketMQ重复消费2. **重试机制**:RocketMQ提供了消息重试机制,在消息发送中出现异常时.消费者会重新拉取相同的消息进行重试.若消费者方没有处理幂等性,则消息会被重复消费3. **集群下的消息重复消费**:在RocketMQ下的集群,如果多个消费者订阅相同的主题,且每个消费者都独立消费消息,那么同一个消息就会被不同的消费者组重复消费### 使用Redssion实现幂等性```javaconsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String msgId = msg.getMsgId();String lockKey = "rocketmq:msg:" + msgId;RLock lock = redissonClient.getLock(lockKey);boolean acquired = false;try {acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);if (acquired) {System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);Thread.sleep(100); // 模拟业务处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} else {System.out.println("Duplicate message skipped: " + msgId);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}} catch (Exception e) {System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());return ConsumeConcurrentlyStatus.RECONSUME_LATER;} finally {if (acquired) {lock.unlock();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});``````javaconsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String msgId = msg.getMsgId();String lockKey = "rocketmq:msg:" + msgId;RLock lock = redissonClient.getLock(lockKey);boolean acquired = false;try {acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);if (acquired) {System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);Thread.sleep(100);return ConsumeOrderlyStatus.SUCCESS;} else {System.out.println("Duplicate message skipped: " + msgId);return ConsumeOrderlyStatus.SUCCESS;}} catch (Exception e) {System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;} finally {if (acquired) {lock.unlock();}}}return ConsumeOrderlyStatus.SUCCESS;}});```### sendAndReceive()- 用于实现请求-响应模式的核心方法,其允许在分布式系统中实现类似RCP同步通信的能力

image10

    - 核心特性- 同步通信:阻塞调用线程直到收到响应- 双向交互:实现生产者与消费者的双向通信- 解耦设计:保持MQ解耦特性同时实现同步交互- 参数列表Message<?> sendAndReceive(`String destination,Message<?> requestMessage,long timeout`) throws MessagingException- 业务场景:实时查询库存信息

Consumer搭建

  1. 引入依赖配置consumer的group:name
        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.4</version></dependency>

image11

  1. 创建消息监听器

实现RocketMQListener接口,重写其onMessage()方法完成消费逻辑

  • 使用@RocketMQMessageListener(consumerGroup=””,topic=””)注解:来指定消费者组,及目标topic

image12

http://www.hskmm.com/?act=detail&tid=37688

相关文章:

  • RFSOC学习记录(五)带通采样定理
  • 3dmax下载安装教程及激活教程(附安装包)3dmax2025超详细下载安装步骤
  • LLM 场景下的强化学习技术扫盲
  • vmware虚拟机下载安装教程(付安装包)详细图文下载安装教程
  • deepin 25 虚拟机安装vgpu客户机驱动
  • NXP S32K118的FTM模块分析
  • 66页作业
  • 写电商详情页不用挠头了:一个还算实用的AI指令模板
  • CF2153D
  • 20232417 2025-2026-1 《网络与系统攻防技术》实验二实验报告
  • iPhone口袋状态检测技术揭秘
  • 搜维尔科技:IROS 2025现场,触觉力反馈、数据手套遥操作机器人灵巧手平台系统解决方案
  • 一些题解
  • Node.js JSON import attributes All In One
  • DeepSeek的“认知提纯”能力解析
  • 梦熊知更鸟赛水题题解合集 (两个人的演唱会 使一颗心免于哀伤 空气蛹)
  • CF2154D
  • Plya 定理学习笔记 | ABC428G 题解
  • 第十八天
  • 第十七天
  • vue3+elementPlus el-date-picker 自定义禁用状态hook 建立结束时间不能小于开始时间
  • [优先队列] P3611 [USACO17JAN] Cow Dance Show S 题解
  • 搜维尔科技将携手Xsens|Haption|Tesollo|Manus亮相IROS 2025国际智能机器人与系统会议
  • 【做题记录】贪心--提高组
  • 如何炫酷地使用集合划分容斥
  • 简单云计算算法--20251023
  • 处理空输入踩的坑
  • latex输入公式
  • 【为美好CTF献上祝福】 New Star 2025 逆向笔记
  • XXL-JOB(5)