基础篇
1. 初识MQ
1.1 同步调用
同步调用是一种线性执行模式。当你调用一个函数后,程序会暂停在当前位置,直到这个函数执行完毕并返回结果后,才会继续执行下一行代码。这就像你在餐厅点餐后,站在柜台前一直等到厨师做好餐品拿到手后才离开.
缺点:
- 拓展性差: 拓展服务需要更改通知代码
- 性能下降: 串行执行, 效果慢
- 级联失败: 前面服务失败, 后面服务也失败
使用场景: 下一步操作需要上一步操作的结果才使用同步调用, 否则可优化为异步调用
1.2 异步调用
异步调用是一种非阻塞的执行模式。发出调用后,程序不会傻等,而是立即继续执行后续代码。被调用的函数(或任务)会在后台(例如在另一个线程中)执行,当它完成时,会通过一种通知机制(如回调函数、事件或消息)来告知调用方结果已就绪 。这就好比你在餐厅点餐后,拿到一个取餐号,然后可以回座位玩手机,当餐准备好时,服务员会叫号通知你取餐 .
对于消息队列的异步调用, 一般包含三个角色:
- 消息发送者: 消息生产者
- 消息代理: 管理, 暂存, 转发消息
- 消息接收者: 消息消费者
优点:
- 解除耦合, 拓展性强
- 无需等待, 性能好
- 故障隔离
- 缓存消息, 流量削峰填谷
缺点:
- 时效性差
- 无法确认下游服务对消息的处理情况
- 业务安全依赖于Broker(消息代理/消息队列) 的可靠性
特性维度 | 同步调用 | 异步调用 |
---|---|---|
核心机制 | 调用后必须等待返回结果才继续执行 | 调用后无需等待,可立即执行后续操作 |
执行时序 | 强时序性,顺序执行,上下文一致 | 非线性,完成顺序不确定,可能先调用的后完成 |
线程状态 | 调用线程可能被阻塞(挂起) | 调用线程非阻塞,可自由执行其他任务 |
结果获取 | 直接通过函数返回值获取 | 通过回调函数、事件通知、Future/Promise等方式获取 |
资源利用率 | 较低,等待期间线程资源可能闲置 | 较高,线程资源可被充分利用 |
代码复杂度 | 逻辑简单直观,易于理解和调试 | 相对复杂,需要处理回调地狱、线程安全等问题 |
典型应用 | 简单的顺序任务、短时间操作 | 高并发服务、I/O密集型任务、GUI应用 |
1.3 MQ技术选型
MQ (MessageQueue)
,中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。
特性维度 | Kafka | RabbitMQ | RocketMQ | ActiveMQ |
---|---|---|---|---|
核心协议 | 自定义协议 | AMQP, MQTT, STOMP | 自研协议 | JMS, AMQP, MQTT |
吞吐量 | 极高 (百万级TPS) | 中等 (万级TPS) | 高 (十万级TPS) | 低 (万级TPS) |
延迟 | 较高 (毫秒-秒级) | 极低 (毫秒级) | 低 (毫秒级) | 毫秒级 |
可靠性 | 高 (多副本机制) | 高 (ACK机制) | 极高 (金融级) | 中 (依赖配置) |
事务消息 | 不支持 | 插件支持 | 原生支持 | 支持 |
顺序消息 | 分区内有序 | 单队列有序 | 分区内严格有序 | 单队列有序 |
扩展性 | 水平扩展极佳 | 集群扩展复杂 | 水平扩展良好 | 垂直扩展为主 |
学习成本 | 高 | 中 (文档详细, 社区支持全面) | 中 | 低 |
2. RabbitMQ
2.1 介绍和安装
- 基本介绍
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmg.com/
- 核心概念
- publisher: 消息发送者
- consumer: 消息消费者
- queue: 队列, 存储消息
- exchange: 交换机, 负责消息的路由
- binding: 交换机绑定队列
- routing key: 路由条件
2.2 快速入门
- 不挂载
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 ----restart=always rabbitmq:management
- 使用数据卷挂载
docker run -e RABBITMQ_DEFAULT_USER=fei \
-e RABBITMQ_DEFAULT_PASS=fei \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
--restart=always \
rabbitmq:management
使用数据卷挂载:mq-plugins是数据卷名,那么已有的插件会复制到这个数据卷中。
这里不存在该数据卷,会自动创建该数据卷。存放的主机目录可通过
docker inspect mq-plugins
进行查看
操作类型 | 核心命令 | 关键参数/说明 |
---|---|---|
创建数据卷 | docker volume create [卷名] |
创建命名的数据卷,便于管理 |
挂载数据卷 | docker run -v [卷名]:[容器路径] ... |
将数据卷挂载到容器内指定路径 |
查看数据卷 | docker volume ls , docker volume inspect [卷名] |
列出所有卷或查看详细信息 |
删除数据卷 | docker volume rm [卷名] |
删除指定的数据卷 |
清理无用卷 | docker volume prune |
清理所有未被容器引用的数据卷 |
- 新建队列hello.queue1和hello.queue2
- 向默认的amp.fanout交换机发送一条消息
- 查看消息是否到达hello.queue1和hello.queue2
- 总结规律
2.3 数据隔离
需求:在RabbitMQ的控制台完成下列操作
- 新建一个用户hmall
- 为hmall用户创建一个virtual host
- 测试不同virtualhost之间的数据隔离现象
只有本用户创建的virtual host 才能被本用户访问
3. Java客户端
3.1 快速入门
需求如下
-
利用控制台创建队列simple.queue
-
在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
-
在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列
-
引入依赖
<!--引入RabbitMQ : AMQP(高级消息队列协议) Advanced-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置
# 在生产者或者消费者引入MQ服务端信息
spring:rabbitmq:host: 192.168.56.2port: 5672 virtual-host: "/" # 虚拟主机username: guestpassword: guest
- 发送消息
@Autowired
private RabbitTemplate rabbitTemplate;@Test
void contextLoads() {rabbitTemplate.convertAndSend("hello.queue1", "hello from java");
}
- 消息接收
SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:
@Component
@Slf4j
public class SpringRabbitMQListener {@RabbitListener(queues = {"hello.queue1"})public void receive(String message) {log.info("receive message: {}", message);}}
3.2 WorkQueue
案例: 一个队列绑定多个消费者
基本思路如下:
- 在RabbitMQ的控制台创建一个队列,名为work.queue
- 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
- 在consumer服务中定义两个消息监听者,都监听work.queue队列
- 消费者1每秒处理50条消息,消费者2每秒处理5条消息
- 生产者
@Test
void testWQ(){for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend("work.queue", "hello from java = " + i);try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
- 消费者
@RabbitListener(queues = {"work.queue"})public void receiveWQ1(String message) {System.out.println("消费者1 接收work.queue = " + message);}@RabbitListener(queues = {"work.queue"})public void receiveWQ2(String message) throws InterruptedException {System.err.println("消费者2 接收work.queue = " + message);TimeUnit.MILLISECONDS.sleep(200);// 1s / 5 == 200 ms}
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息 # 能者多劳
3.3 Fanout交换机
真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,常用的交换机类型有以下三种:
- Fanout: 广播
- Direct: 定向
- Topic: 话题
- headers
- x-local-random
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
案例
- 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
- 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
- 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
- 在publisher中编写测试方法,向hmall.fanout发送消息
- 总结: 交换机的作用
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- FanoutExchange的会将消息路由到每个绑定的队列
3.4 Direct交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
案例
- 在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
- 在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定
- 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
- 在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息
@Testvoid testDirect(){rabbitTemplate.convertAndSend("hmall.direct", "dq1", "direct test"); // 只有按dq1绑定的queue可以接收}
3.5 Topic交换机
TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以.
分割。
Queue与Exchange指定BindingKey时可以使用通配符
#
: 代指0个或多个单词*
: 代指一个单词
例子:
china.#
: 以china开头的routingkey, 比如china.sc.bz
,china.sc
都可以路由到china.*
: 以china 开头的routingkey, 只有china.sc
可以接收到,china.sc.bz
接收不到
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey可以是多个单词,以
.
分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
#
: 代表0个或多个词*
: 代表1个词
3.6 队列交换机的声明
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
-
Queue:用于声明队列,可以用工厂类QueueBuilder构建
-
Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
-
Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
-
配置声明方式
package com.fei.mq.publisher.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {/*** 声明FanoutExchange交换机* @return*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("mall.fanout");}/*** 声明队列* @return*/@Beanpublic Queue Queue1() {return new Queue("mall.queue1");}/*** 声明绑定关系* @param Queue1 已声明的队列* @param fanoutExchange 已声明的交换机* @return*/@Beanpublic Binding bindingQueue(Queue Queue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(Queue1).to(fanoutExchange);}
}
- 注解声明方式
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "inject.queue1"),exchange = @Exchange(name = "inject.direct",type = ExchangeTypes.DIRECT),key = {"inject.d","d.i"})
)
public void receiveDirect(String message) {System.out.println("消费者接收到Direct消息:" + message);
}
-
声明队列、交换机、绑定关系的Bean是什么?
-
Queue
-
FanoutExchange、DirectExchange、TopicExchange
-
Binding
-
-
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
- @Queue
- @Exchange
3.7 消息转换器
需求: 测试利用SpringAMQP发送对象类型的消息
- 声明一个队列,名为object.queue
- 编写单元测试,向队列中直接发送一条消息,消息类型为Map
- 在控制台查看消息,总结你能发现的问题
Spring的对消息对象的处理是出org.springframework.amqp.support.converter.MessageConverter
来处理的。而默认实现是SimpleMessageConverter
,基于JDK的ObjectOutputStream
完成序列化。存在下列问题:
- JDK的序列化有安全风险
- JDK序列化的消息太大
- JDK序列化的消息可读性差
建议采用JSON序列化代替默认的JDK序列化,要做两件事情:在publisher和consumer中都要引入jackson依赖:
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
@Bean
public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();
}
- 发送者
@Testvoid testMap(){HashMap<String, String> map = new HashMap<>();map.put("name","jack");rabbitTemplate.convertAndSend( "hmall.topic","china", map);}
- 消费者
@RabbitListener(queues = "topic.queue1")public void receiveTopic(Map<String,String > message) {System.out.println("消费者 map 绑定 = " + message);}
消息发送者和消息消费者处理消息的类型需要保持一致
案例:
需求: 改造余额支付功能,不再同步调用交易服务的0penFeign接口,而是采用异步MQ通知交易服务
- 订单服务 - 消费者
@Component
@RequiredArgsConstructor
public class PayStatusListener {private final OrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "mark.queue.pay.queue", durable = "true"),exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),key = "pay.success"))public void listen(Long orderId){orderService.markOrderPaySuccess(orderId);}}
- 支付服务 - 生产者
@Testvoid testPay(){rabbitTemplate.convertAndSend("pay.topic","pay.success",910101010);}
高级篇
4. 消息可靠性
4.1 发送者的可靠性
4.1.1 生产者重连
有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:
server:port: 8056
# 在生产者或者消费者引入MQ服务端信息
spring:rabbitmq:host: 192.168.56.2port: 5672virtual-host: "/" # 虚拟主机username: guestpassword: guestlistener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息connection-timeout: 1s # 设置MQ连接超时时间template:retry:enabled: true # 开启超时重连机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次等待时长的倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
4.1.2 生产者确认
RabbitMQ有Publisher Confirm
和Publisher Return
两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
- 消息投递到了MQ,但是路由失败。此时会通过
Publisher Return
返回路由异常原因,然后返回ACK
,告知投递成功 - 临时消息投递到了MQ,并且入队成功,返回
ACK
,告知投递成功 - 持久消息投递到了MQ,并且入队完成持久化,返回
ACK
,告知投递成功 - 其它情况都会返回
NACK
,告知投递失败
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-return: true # 开启publisher return 机制
配置说明
这里
publisher-confirm-type
有三种模式:
none
: 关闭confirm机制simple
: 同步阻塞等待MQ的回执消息correlated
: MQ异步回调方式返回回执信息
ReturnCallback
: 每个RabbitTemplate只能配置一个ReturnCallback
, 因此需要在项目启动中配置- 消息成功抵达交换机,但无法路由到任何队列(全局性路由失败)
- 全局的、基础保障型的回调。类似于一个“故障报警器”,监听某种特定类型的系统问题。
- 通过
rabbitTemplate.setReturnsCallback()
全局设置一次,通常在配置类中完成
@Beanpublic RabbitTemplate initTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setReturnsCallback((msg)->{log.info(msg.toString());});rabbitTemplate.setMessageConverter(messageConverter());return rabbitTemplate;}
-
ConfrimCallback
: 发送消息, 指定消息ID, 消息ConfirmCallback-
在每次调用
convertAndSend()
方法时,通过CorrelationData
动态关联 -
ConfirmCallback
用于确认消息是否成功抵达交换机(ack
/nack
)。它的处理逻辑则复杂和个性化得多. -
与具体业务逻辑相关的、可定制化的回调。类似于给每个寄出的快递单独设置“送达通知”。
-
// 发送消息1:支付消息 | 老版本
CorrelationData payData = new CorrelationData("ORDER_PAY_001");
payData.getFuture().addCallback(result -> {if (result.isAck()) {log.debug("支付消息发送成功");} else {// 支付消息发送失败的特殊处理逻辑,如通知支付系统handlePayMessageFailure(result.getReason());}
});
rabbitTemplate.convertAndSend("exchange.pay", "routing.pay", payMessage, payData);
// 发送消息1:支付消息 | 新版本@Testvoid testConfirmCallback(){// 发送消息1:支付消息CorrelationData payData = new CorrelationData("ORDER_PAY_001");payData.getFuture().thenAccept(result->{if (result.isAck()) {System.out.println("消息发送成功 callback ack");} else {// 支付消息发送失败的特殊处理逻辑,如通知支付系统System.out.println("消息发送失败 callback ack");}});rabbitTemplate.convertAndSend("pay.topic", "pay.success", "支付消息001", payData);}
4.1.3 总结
- SpringAMQP中生产者消息确认的几种返回值情况:
- 消息投递到了MQ,但是路由失败。会return路由异常原因,返回ACK
- 临时消息投递到了MQ,并且入队成功,返回ACK
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK
- 其它情况都会返回NACK,告知投递失败
- 如何处理生产者的确认消息?
- 生产者确认需要额外的网络和系统资源开销,尽量不要使用, 除非对消息的可靠性有较高的要求
- 如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题
- 对于nack消息可以有限次数重试,依然失败则记录异常消息
4.2 MQ的可靠性
在默认情况下,RabbitMO会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
- 一旦MO宕机,内存中的消息会丢失
- 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞
4.2.1 数据持久化
RabbitMQ实现数据持久化包括3个方面:
- 交换机持久化 ( 默认持久化) : 交换机属性设置为持久化, 保证交换机持久化存在, 否则重启交换机消失
- 队列持久化 (默认持久化) : 队列设置为持久化, 保证队列持久化存在, 否则重启队列消失
- 消息持久化:
delivery_mode: 2
4.2.3 Lazy Queue
从RabbitMQ的3.6.0版本开始,就增加了LazyQueue的概念,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
- 在3.12版本后,所有队列都是LazyOueue模式,无法更改。
要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:
// 基于Bean
@Bean
public Queue lazyQueue(){return QueueBuilder.durable("queue.name").lazy().build();
}
// 基于注解@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "mark.queue.pay.queue", durable = "true"),exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),arguments = @Argument(name = "x-queue-mode",value = "lazy"),key = "pay.success"))public void listen(Long orderId){orderService.markOrderPaySuccess(orderId);}
4.3 消费者的可靠性
4.3.1 消费者确认机制
当消费者为了确认消费者是否成功处理消息,RabbitMO提供了消费者确认机制(Consumer Acknowledgement)处理消息结束后,应该向RabbitMO发送一个回执,告知RabbitM0自己消息处理状态。回执有三种可选值:
ack
: 成功处理消息,RabbitMO从队列中删除该消息nack
: 消息处理失败,RabbitMO需要再次投递消息reject
: 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
SpringAMOP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
none
: 不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用manual
: 手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活auto
: 自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常,会自动返回nack
- 如果是消息处理或校验异常,自动返回reject
spring:rabbitmq:host: 192.168.56.2port: 5672virtual-host: "/" # 虚拟主机username: guestpassword: guestlistener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息acknowledge-mode: manual # 手动确认接受消息,不确认不成功处理
4.3.2 消费失败处理
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mq的消息处理飙升,带来不必要的压力。
们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列:
server:port: 8056
# 在生产者或者消费者引入MQ服务端信息
spring:rabbitmq:host: 192.168.56.2port: 5672virtual-host: "/" # 虚拟主机username: guestpassword: guestlistener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息acknowledge-mode: auto # manual # 手动确认接受消息,不确认不成功处理retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始化的失败等待时长为1秒multiplier: 1 # 失败后下次等待时长的倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 #大重试次数stateless: true # 无状态。 如果有事务操作,此处改为falseconnection-timeout: 1s # 设置MQ连接超时时间template:retry:enabled: true # 开启超时重连机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次等待时长的倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
什么是无状态服务?
无状态服务就像是一个“金鱼脑”的厨师:你每次点餐都需要告诉他完整的订单,他做完这份餐后就会忘记你。下一次你来,即使间隔只有一秒,他也需要你重新报一遍菜单。
- 技术本质:服务实例在处理请求时,不依赖任何之前交互产生的数据。请求本身必须包含服务所需的所有信息(如认证Token、用户ID等)。
- 优势:这种设计带来了极佳的可扩展性。因为每个实例都一样,你可以通过简单地增加实例数量来应对高流量,负载均衡器可以将请求发送到任意一个可用的实例上。同时,故障恢复也很快,一个实例宕机,新的实例可以立刻顶替上来。
- 典型场景:RESTful API 是无状态设计的典范。每个HTTP请求都携带认证头和所有必要参数。微服务架构中的大部分业务逻辑服务(如用户查询、订单计算)也通常被设计为无状态的,以便独立部署和伸缩。
什么是有状态服务?
有状态服务则像是一位“记忆力超群”的私人管家。你第一次告诉他你的喜好后,之后每次服务,他都记得你的习惯,无需你重复。
- 技术本质:服务实例在内存或本地存储中保存了客户端的会话状态。后续请求的处理依赖于这些保存的信息。
- 挑战:这导致扩展性复杂。如果你有多个实例,用户第一次请求被实例A处理,其状态保存在A上;用户的第二次请求如果被负载均衡器发给了实例B,B由于没有该用户的状态,就会认为用户未登录或会话无效,从而导致错误。因此,必须使用“粘性会话”等技术确保同一用户的请求总是发往同一个实例。
- 典型场景:传统单机应用、关系型数据库(如MySQL主从复制)、以及需要维持长期连接的场景(如WebSocket通信)通常是有状态的。
总之, 无状态服务就是无记忆服务, 有状态服务就是有记忆服务
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer
: 重试耗尽后,直接reiect,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer
: 重试耗尽后,返回nack,消息重新入队RepublishMessageRecoverer
: 重试耗尽后,将失败消息投递到指定的交换机error.exchange => error.queue
(1)自动处理
acknowledge-mode: auto
将失败处理策略改为RepublishMessageRecoverer
:
-
首先,定义接收失败消息的交换机、队列及其绑定关系
-
然后,定义
RepublishMessageRecoverer
:
package com.fei.mq.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true") // 只有在配置true方可生效
public class MQErrorConfig {@Beanpublic Queue errorQueue() {return new Queue("error.queue");}@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct");}@Beanpublic Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}/*** 消息恢复 重试耗尽后,将失败消息投递到指定的交换机 `error.exchange => error.queue`* @param rabbitTemplate* @return*/@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "mark.queue.pay.queue", durable = "true"),exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),
// arguments = @Argument(name = "x-queue-mode",value = "lazy"),key = "pay.success"))public void listen(Long orderId, Channel channel, Message message) throws IOException, RuntimeException {orderService.markOrderPaySuccess(orderId);System.out.println("消息处理中");throw new RuntimeException("故意的"); // 发生异常时,AOP增强将消息发送给error.exchange交换机}
(2)手动处理
acknowledge-mode: manual
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "mark.queue.pay.queue", durable = "true"),exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),key = "pay.success"))public void listen(Long orderId, Channel channel, Message message) throws IOException, RuntimeException {try{orderService.markOrderPaySuccess(orderId);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);System.out.println("消息处理成功");}catch (IOException e) {System.out.println("异常发生,消息退回消息队列");channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}
(3)总结
消费者如何保证消息一定被消费?
- 开启消费者确认机制为auto,由spring确认消息处理成功后返回ack,异常时返回nack
- 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
4.3.3 业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
(1)唯一消息ID
方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库(Redis) 查询判断是否存在,存在则为重复消息放弃处理
@Bean
public MessageConverter messageConverter() {Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();converter.setCreateMessageIds(true); // UUID | 多节点时,应当使用全局IDreturn converter;
}
(2)业务状态判断
方案二,是结合业务逻辑,基于业务本身做判断。以我们的业务为例: 我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理。
(3)总结
如何保证支付服务与交易服务之间的订单状态一致性?
- 消息通知:首先,支付服务会在用户支付成功以后利用MQ消息通知交易服务完成订单状态同步。
- 避免消息丢失:其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失
- 幂等判断:最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常
如果交易服务消息处理失败,有没有什么兜底方案?
- 定时任务:我们可以在交易服务设置定时任务,定期查询订单支付状态。这样即便MO通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
4.4 延迟消息
延迟消息: 生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务: 设置在一定时间之后才执行的任务
4.4.1 死信交换机
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter)
- 消费失败:消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为false - 消息过期:消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费要投递的队列
- 消息积压:消息堆积满了,最早的消息可能成为死信
如果队列通过x-dead-letter-exchange
属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead LetterExchange,简称DLX)。
关键点:
- 两个交换机、两个队列,一个消费者
- 第一个交换机是普通交换机
- 死信队列为普通队列,只是设置了
x-dead-letter-exchange
, 以及消息的过期时间
- 第二个交换机是一个普通交换机
分析可知,实际上可以使用一个交换机,两个不同的bindingkey即可。
- 组件配置
package com.fei.mq.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayMQConfig {private static final String DEAD_LETTER_EXCHANGE = "order-event-exchange";private static final String DEAD_LETTER_QUEUE = "order.delay.queue"; private static final int DEAD_TTL = 10000; // 过期时间,毫秒private static final String RELEASE_QUEUE = "order.release.queue"; // 死信队列private static final String DEAD_LETTER_CREATE = "order.create.order"; // 普通路由, 从生产者发送到交换机,交换机通过此路由,路由到死信队列(无消费者队列)private static final String DEAD_LETTER_RELEASE = "order.release.order"; // 死信路由 == 2 次路由: 第一次从死信队列路由到交换机, 第二次又从交换机路由到普通队列/*** 普通路由, 通过此路由,将消息发送到交换机,由交换机路由到死信队列(无消费者队列)* @return*/@Beanpublic Binding createBinding(){return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_CREATE);}/*** 交换机,延迟消息中唯一的交换机,两个用处:* 第一: 将生产者的消息路由到死信队列* 第二: 将死信队列的消息路由到普通队列* @return*/@Beanpublic TopicExchange deadLetterExchange() {return new TopicExchange(DEAD_LETTER_EXCHANGE);}/*** 死信队列。需要设置:* 1. 死信队列名* 2. 死信交换机名* 3. 死信路由键* 4. 消息过期时间* @return*/@Beanpublic Queue deadLetterQueue() {return QueueBuilder.durable(DEAD_LETTER_QUEUE) // 死信队列名.deadLetterExchange(DEAD_LETTER_EXCHANGE) // 交换机名.deadLetterRoutingKey(DEAD_LETTER_RELEASE) // 死信路由 == 2 次路由: 第一次从死信队列路由到交换机, 第二次又从交换机路由到普通队列.ttl(DEAD_TTL) // the time to live (milliseconds)..build();}/*** 发布路由,此路由将死信交换机与普通队列(有消费者队列)绑定。路由键名需要和死信路由键保持相同。* 此路由将死信交换机的消息路由到普通队列* @return*/@Beanpublic Binding releaseBinding(){return BindingBuilder.bind(releaseQueue()).to(deadLetterExchange()).with(DEAD_LETTER_RELEASE);}/*** 普通队列(有消费者的队列)* @return*/@Beanpublic Queue releaseQueue(){return QueueBuilder.durable(RELEASE_QUEUE).build();}
}
- 生产者
@Testvoid testDelayMsg(){rabbitTemplate.convertAndSend("order-event-exchange","order.create.order","延迟消息 + " + new Date());}
- 消费者
@Component
public class DelayMessageListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "order.release.queue", durable = "true"),exchange = @Exchange(name = "order-event-exchange", type = ExchangeTypes.TOPIC),
// arguments = @Argument(name = "x-queue-mode",value = "lazy"),key = "order.release.order"))public void receive(String msg) {System.out.println(msg);System.out.println("定时消息处理成功");}
}
4.4.2 延迟消息插件
Community Plugins | RabbitMQ
RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
安装:
- 将安装包放入插件目录 :
docker inspect mq-plugins
查看挂载目录- 执行
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 生产者
@Testvoid testDelayMsg2(){rabbitTemplate.convertAndSend("delayed.exchange", "delay.key","插件路由延迟消息", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(10000L);// 毫秒return message;}});System.out.println("消息发送成功---");}
- 消费者
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delayed.mq", durable = "true"),exchange = @Exchange(name = "delayed.exchange", type = ExchangeTypes.TOPIC, delayed = "true"),key = "delay.key"))public void receiveDelayByPlugin(String msg) {System.out.println(msg);System.out.println("延迟消息处理成功");}
4.4.3 取消超时订单
5. RabbitMQ集群
RabbitMQ 集群通过将多个 RabbitMQ 节点(服务器)组织在一起,形成一个逻辑上的消息代理(Broker),旨在提供高可用性、容错性和可扩展性。
🔍 集群核心概念与工作原理
数据同步:元数据与消息数据
如架构图所示,RabbitMQ 集群中,所有节点都保存相同的元数据(Metadata),包括交换机、队列的定义、绑定关系、用户权限等。这使得客户端可以连接至集群中任何一个节点,都能看到完整的结构。然而,队列中的消息内容本身并不会在所有节点间自动复制。消息只存储在创建该队列的节点(主节点)上。当客户端连接到一个不包含该队列数据的节点(非主节点)时,集群内部会进行消息的路由转发。
节点类型:磁盘节点与内存节点
- 磁盘节点:将元数据存储在磁盘中。集群中必须至少有一个磁盘节点,通常建议配置两个或以上,以防止单点故障导致整个集群无法修改配置。
- 内存节点:将元数据存储在内存中,性能更好,但节点重启后元数据会丢失。它依赖磁盘节点来持久化元数据信息。
⚙️ 集群模式详解
1. 普通集群模式
这是默认模式。如图左侧所示,队列的消息实体仅存在于其主节点上。其他节点仅保存元数据和指向主节点的指针。
- 优点:提高了集群的总体消息吞吐能力,因为消息可以被分片到不同节点上。
- 缺点:无法保证高可用。如果某个队列的主节点宕机,那么该队列及其消息将不可用,直到节点恢复。
2. 镜像集群模式
这是实现高可用性的推荐方案。如图右侧“镜像队列数据流”部分所示,它基于普通集群,通过配置镜像队列,使得队列中的消息会被同步到集群中的多个节点上。
- 工作方式:一个队列有且仅有一个主节点,但可以配置一个或多个镜像节点。所有针对队列的操作(如消息发布)首先在主节点完成,然后同步到所有镜像节点。
- 高可用:当主节点故障时,资历最老的镜像节点会自动提升为新的主节点,服务几乎不中断,从而保证了队列的可用性。
- 配置策略:可以通过策略灵活定义镜像规则,例如:
ha-mode: all
:队列镜像到集群中的所有节点。ha-mode: exactly
:指定镜像节点的确切数量。ha-sync-mode: automatic
:配置自动同步。
3. 仲裁队列
这是 RabbitMQ 3.8 版本后引入的,旨在替代镜像队列的现代实现。
- 核心机制:基于 Raft 一致性算法,实现了强一致性的数据复制,能有效避免脑裂问题。
- 优势:配置更简单,默认内置高可用特性,提供了比传统镜像队列更可靠和高效的数据同步机制
生产规范
5.1 命名规则
组件 | 命名结构 | 示例 | 核心要点 |
---|---|---|---|
Exchange (交换机) | EX.[SourceAppId].[模块名].[业务名1].[业务名2] |
EX.order.payment.status.update |
清晰体现消息来源和业务职责 |
Queue (队列) | MQ.[SourceAppId].[TargetAppId].[业务名1].[业务名2] |
MQ.order.user.notification.levelup |
明确标识消息的生产者(Source)和消费者(Target) |
Routing Key (路由键) | RK.[SourceAppId].[业务名1].[业务名2] 或 [服务名].[消息类型].[动作].[子动作] |
RK.order.payment.success 或 user.notification.levelup |
采用点分层级,便于 Topic 交换机使用通配符 (* , # ) 进行灵活路由 |
5.2 其他
规范领域 | 核心目标 | 关键规范要点 |
---|---|---|
消息可靠性规范 | 确保消息不丢失、不被重复消费 | 持久化、确认机制(ACK)、事务消息、幂等性设计 |
安全规范 | 防止未授权访问和数据泄露 | 认证与授权、传输加密(TLS/SSL)、网络隔离 |
协议与模型规范 | 保证系统兼容性和接口清晰 | 协议选型(AMQP, JMS, MQTT等)、消息模型统一(点对点/发布订阅) |
运维与监控规范 | 保障系统高可用和可维护性 | 集群化部署、监控告警、日志追踪、灾备方案 |
性能与可用性规范 | 应对高并发,保证服务连续性 | 资源规划、流量控制(削峰填谷)、优雅降级 |