本章目标
-
理解交换机(Exchange)在RabbitMQ中的核心作用。
-
掌握发布/订阅模式(Publish/Subscribe)的实现。
-
学习扇形交换机(Fanout Exchange)的使用。
-
理解绑定(Binding)的概念。
-
实现一个日志广播系统。
一、理论部分
1. 交换机(Exchange)简介
在前面的章节中,我们一直使用默认交换机(空字符串""
),生产者直接将消息发送到队列。但RabbitMQ的真正强大之处在于它的交换机机制。
交换机是消息的入口点。生产者将消息发送到交换机,而不是直接发送到队列。交换机根据特定的规则和类型,决定将消息路由到哪些队列中。
2. 交换机类型
RabbitMQ提供了几种不同类型的交换机,每种都有不同的路由行为:
-
扇形交换机(Fanout Exchange):将消息广播到所有绑定到它的队列中(忽略路由键)。本章重点。
-
直连交换机(Direct Exchange):根据精确匹配的路由键将消息路由到队列(我们在第2章使用的默认交换机就是直连类型)。
-
主题交换机(Topic Exchange):基于模式匹配的路由(使用通配符)。
-
头交换机(Headers Exchange):基于消息头属性而不是路由键进行路由。
3. 发布/订阅模式(Publish/Subscribe)
发布/订阅模式的核心思想是:一条消息被分发给多个消费者。每个消费者都会收到相同的消息副本。这非常适合需要将同一信息通知给多个不同系统的场景,比如:
-
日志广播系统
-
新闻推送
-
系统事件通知
-
缓存更新通知
4. 绑定(Binding)
绑定是交换机和队列之间的连接。你可以理解为:"这个队列对这个交换机的消息感兴趣"。当我们创建绑定时,可以指定一个绑定键(Binding Key),交换机用它来决定哪些消息应该被路由到这个队列。
对于扇形交换机,绑定键会被忽略,所有绑定到该交换机的队列都会收到消息。
二、实操部分:构建日志广播系统
我们将创建一个日志系统,其中:
第1步:创建项目
-
创建一个新的解决方案。
-
添加一个控制台应用程序项目作为生产者:EmitLog
。
-
添加两个控制台应用程序项目作为消费者:ReceiveLogs
(用于控制台输出)和SaveLogs
(模拟保存到文件)。实际中可以创建更多消费者。
-
为所有项目添加RabbitMQ.Client
NuGet包。
第2步:编写日志生产者(EmitLog.cs)
using System.Text;
using RabbitMQ.Client;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 1. 声明一个扇形交换机(Fanout Exchange)// 参数说明:// exchange: "logs" - 交换机的名称// type: "fanout" - 交换机类型// durable: false - 是否持久化(服务器重启后是否存在)// autoDelete: false - 当所有队列都解绑后,是否自动删除交换机channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);// 2. 准备消息(从命令行参数获取或使用默认消息)var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);// 3. 发布消息到交换机(而不是队列!)// 关键变化:指定exchange参数为"logs",而不是空字符串// 对于fanout交换机,routingKey会被忽略,但通常我们还是提供一个有意义的键channel.BasicPublish(exchange: "logs",routingKey: "", // 对于fanout交换机,这个值被忽略basicProperties: null,body: body);Console.WriteLine($" [x] Sent to exchange 'logs': {message}");
}Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();static string GetMessage(string[] args)
{return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!";
}
关键点:
-
使用ExchangeDeclare
方法声明一个名为logs
的扇形交换机。
-
在BasicPublish
中指定exchange: "logs"
,而不是之前的空字符串。
-
对于扇形交换机,routingKey
参数被忽略,但我们仍然提供它。
第3步:编写第一个日志消费者(ReceiveLogs.cs)- 控制台输出
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 1. 声明扇形交换机(必须与生产者使用相同的交换机名称和类型)channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);// 2. 声明一个临时队列// 关键点:我们不指定队列名称,让RabbitMQ生成一个随机名称// exclusive: true - 当连接关闭时,队列会被自动删除var queueName = channel.QueueDeclare().QueueName;// 3. 将队列绑定到交换机// 对于fanout交换机,bindingKey被忽略(这里用空字符串)
channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");Console.WriteLine($" [*] Waiting for logs. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [CONSOLE] {message}");};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}
关键点:
-
使用QueueDeclare()
而不指定队列名,让RabbitMQ生成一个唯一的、随机的队列名。
-
设置exclusive: true
(默认值),这样当消费者断开连接时,队列会被自动删除。
-
使用QueueBind
将队列绑定到logs
交换机。
第4步:编写第二个日志消费者(SaveLogs.cs)- 模拟保存到文件
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 同样的步骤:声明交换机、创建临时队列、绑定channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);var queueName = channel.QueueDeclare().QueueName;channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");Console.WriteLine($" [*] Waiting for logs to save. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 模拟将日志保存到文件(这里只是打印,实际中可以写入文件)Console.WriteLine($" [x] [FILE] Saved log: {message} - {DateTime.Now:yyyy-MM-dd HH:mm:ss}");// 模拟文件写入的延迟Thread.Sleep(500);};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}
这个消费者与第一个几乎相同,只是模拟了不同的处理逻辑(保存到文件)。
第5步:运行与演示
-
启动多个消费者
打开三个终端窗口:
每个消费者启动时都会显示一个随机生成的队列名,如:
[*] Waiting for logs. Queue: amq.gen-JzTY20BRgKO-HjmUJj0wLg
-
查看管理后台
访问 http://localhost:15672,查看Exchanges标签页:
-
发送日志消息
在另一个终端运行EmitLog
项目:
cd EmitLog
dotnet run "User john.doe logged in successfully"
dotnet run "Warning: Database connection pool 80% full"
dotnet run "Error: Payment service timeout after 30 seconds"
-
观察现象
-
所有三个消费者都会收到并处理每一条消息!
-
消息被广播到了所有绑定到logs
交换机的队列。
-
每个消费者可以以不同的方式处理同一条消息。
-
动态测试
本章总结
在这一章中,我们深入学习了RabbitMQ的核心组件——交换机,并实现了强大的发布/订阅模式:
-
交换机(Exchange):理解了交换机作为消息入口点的作用,以及它与队列的关系。
-
扇形交换机(Fanout Exchange):掌握了最简单的交换机类型,它将消息无条件地广播到所有绑定的队列。
-
发布/订阅模式:实现了一条消息被多个消费者同时接收的场景。
-
临时队列:学习了如何创建匿名队列,用于短暂的发布/订阅场景。
-
绑定(Binding):理解了队列和交换机之间的连接关系。
现在你已经能够构建消息广播系统了。在下一章,我们将学习直连交换机(Direct Exchange),实现有选择性的消息路由——让消费者只接收它们感兴趣的消息(比如只处理error级别的日志)。这将使我们的消息系统更加精细和高效。