本章目标
-
理解工作队列(竞争消费者模式)的概念和适用场景。
-
掌握消息确认(Acknowledgment)机制,实现可靠的消息处理。
-
学习消息持久化(Durability),防止服务器重启导致消息丢失。
-
使用公平分发(Fair Dispatch)来优化多个消费者的工作效率。
一、理论部分
1. 工作队列(Work Queues / Task Queues)
在第2章的"Hello World"示例中,我们每发送一条消息,就会被一个消费者立即接收。但在实际应用中,我们往往需要处理一些耗时任务(如发送邮件、处理图片、生成报告等)。
工作队列(又称任务队列)的核心思想是避免立即执行资源密集型任务并等待其完成,而是将任务封装为消息并发送到队列中。在后台运行的多个工作进程(消费者)会从队列中取出消息并进行处理。
这种多个消费者从一个队列中获取消息的模式称为竞争消费者模式(Competing Consumers Pattern),它能很容易地实现并行处理,从而横向扩展系统。
2. 消息确认(Message Acknowledgment)
在默认的自动确认(autoAck: true
)模式下,消息一旦被RabbitMQ传递给消费者,就会立即从队列中删除。这有一个严重的问题:如果消费者在处理消息过程中崩溃或断开连接,这条正在处理的消息就会永久丢失,而且无法被其他消费者重新处理。
为了解决这个问题,AMQP提供了消息确认机制:
-
消费者在创建时设置
autoAck: false
(手动确认模式)。 -
当消费者成功处理完一条消息后,它会显式地向RabbitMQ发送一个确认(ACK)。
-
只有在收到ACK后,RabbitMQ才会安全地从队列中删除该消息。
-
如果消费者在处理过程中断开连接(没有发送ACK),RabbitMQ会认为该消息未被成功处理,并将其重新入队,然后传递给另一个消费者(如果存在)。
这种机制确保了即使消费者偶尔死亡,消息也不会丢失。
3. 消息持久化(Message Durability)
消息确认机制保护了消息在消费者处理时不丢失。但如果RabbitMQ服务器本身停止或崩溃了呢?默认情况下,RabbitMQ退出或崩溃时,它会忘记所有的队列和消息。
为了确保消息在服务器重启后仍然存在,我们需要做两件事:
-
将队列声明为持久的(Durable):这样队列本身会在服务器重启后继续存在。
-
将消息标记为持久的(Persistent):在发布消息时,设置
IBasicProperties.Persistent = true
。
注意:将消息标记为
Persistent
并不能完全保证消息永不丢失。虽然RabbitMQ会将消息保存到磁盘,但在它接收到消息和保存到磁盘之间仍然有一个很短的时间窗口。对于更强的保证,需要使用发布者确认(Publisher Confirms),这将在后续章节介绍。
4. 公平分发(Fair Dispatch)
默认情况下,RabbitMQ会使用轮询(Round-robin) 的方式将消息平均分发给所有消费者,而不考虑每个消费者当前未确认的消息数量。这可能导致一个问题:某些消息处理起来很耗时,而某些很快。如果一个繁忙的消费者前面堆积了很多未确认的消息,而空闲的消费者却得不到新任务,就会造成处理能力浪费。
为了解决这个问题,我们可以使用 basicQos
方法并设置 prefetchCount = 1
。这告诉RabbitMQ不要一次向一个消费者发送超过一条消息。或者换句话说,在消费者处理并确认上一条消息之前,不要向其发送新消息。这样,RabbitMQ会将新消息分发给下一个空闲的消费者。
二、实操部分:构建可靠的工作队列
我们将创建一个任务发布者(NewTask
)和多个工作者(Worker
)。任务消息中的点号.
数量代表其处理复杂度(每个点号耗时1秒)。
第1步:创建项目
-
创建一个新的解决方案。
-
添加两个控制台应用程序项目:
NewTask
(生产者) 和Worker
(消费者)。 -
为两个项目添加
RabbitMQ.Client
NuGet包。
第2步:编写可靠的任务生产者(NewTask.cs)
using System.Text;
using RabbitMQ.Client;// 示例: dotnet run "Message."
// dotnet run "Message.."
// dotnet run "Message..." ...
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 1. 声明一个持久化的队列channel.QueueDeclare(queue: "task_queue",durable: true, // 队列持久化exclusive: false,autoDelete: false,arguments: null);// 2. 准备消息var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);// 3. 设置消息属性为持久化var properties = channel.CreateBasicProperties();properties.Persistent = true;// 4. 发布消息channel.BasicPublish(exchange: "",routingKey: "task_queue",basicProperties: properties, // 传入持久化属性body: body);Console.WriteLine($" [x] Sent {message}");
}Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();static string GetMessage(string[] args)
{return args.Length > 0 ? string.Join(" ", args) : "Hello World!";
}
关键更改:
-
QueueDeclare
中的durable: true
确保队列在服务器重启后依然存在。 -
创建了
IBasicProperties
对象并设置Persistent = true
,使消息本身也被标记为持久化。
第3步:编写可靠的工作者(Worker.cs)
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 声明持久化队列(必须与生产者声明参数一致)channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);// !!! 关键设置:公平分发 !!!// 告诉RabbitMQ,在当前工作者处理并确认上一条消息之前,不要向其发送新消息channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] Received {message}");// 模拟耗时任务,消息中的每个点号'.'代表1秒工作int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");// !!! 手动发送消息确认(ACK) !!!// 只有在任务处理完成后,才发送ACK,告知RabbitMQ可以安全删除消息channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};// 启动消费者,设置 autoAck: false (手动确认模式)channel.BasicConsume(queue: "task_queue",autoAck: false, // 关闭自动确认!consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}
关键更改:
-
channel.BasicQos(0, 1, false)
: 设置公平分发,每个消费者一次只预取一条消息。 -
channel.BasicConsume(autoAck: false)
: 切换到手动确认模式。 -
在
Received
事件处理程序的最后,调用channel.BasicAck(...)
来显式确认消息处理完成。ea.DeliveryTag
是消息的唯一标识符。 -
使用
Thread.Sleep
模拟耗时任务。
第4步:运行与演示
-
启动两个(或多个)工作者(Worker)
打开两个终端窗口,分别运行Worker
项目。cd Worker dotnet run
两个窗口都会显示
[*] Waiting for messages.
。 -
发送任务
运行NewTask
项目来发送一些耗时不同的任务。cd NewTask dotnet run "First message." # 耗时约1秒 dotnet run "Second message.." # 耗时约2秒 dotnet run "Third message..." # 耗时约3秒 dotnet run "Fourth message...." # 耗时约4秒 dotnet run "Fifth message....." # 耗时约5秒
-
观察现象
-
你会看到任务被轮流分配给两个工作者(轮询分发)。
-
但是,由于我们设置了
prefetchCount=1
,当一个工作者正在处理一个长任务(例如5秒)时,RabbitMQ不会再给它发送新消息,而是会将新消息分发给另一个空闲的工作者。这就是公平分发的效果。 -
查看管理后台(Queues),你会看到 "Unacked"(未确认)消息的数量。只有当工作者调用
BasicAck
后,这个消息才会消失。
-
-
演示消息确认的重要性
-
让一个工作者正在处理一个长任务(比如5秒的任务)。
-
在它处理过程中,强制关闭这个工作者的终端窗口(模拟消费者崩溃)。
-
观察另一个工作者窗口和管理后台:刚才那条被中断处理的消息(状态为Unacked)会重新变为Ready,并被自动传递给另一个仍在运行的工作者进行处理。这样就保证了消息绝不会因为消费者崩溃而丢失。
-
本章总结
在这一章中,我们构建了一个可靠的工作队列系统,并深入学习了RabbitMQ的核心可靠性机制:
-
工作队列模式:使用多个消费者并行处理耗时任务。
-
消息确认(ACK):通过手动确认(
autoAck: false
)和BasicAck
,确保消息只有在被成功处理后才会被删除,防止消费者崩溃导致消息丢失。 -
消息与队列持久化:通过
durable: true
和properties.Persistent = true
,防止RabbitMQ服务器重启导致消息丢失。 -
公平分发(QoS):通过
BasicQos
和prefetchCount: 1
,优化任务分发,使空闲的消费者能优先获得新任务,提高整体处理效率。
现在,你已经能够构建一个健壮的、用于处理后台任务的分布式系统了。在下一章,我们将离开简单的队列模型,探索RabbitMQ更强大的功能——交换机(Exchange),学习如何实现发布/订阅模式,将一条消息投递给多个消费者。