本章目标
一、理论部分
1. 主题交换机(Topic Exchange)简介
主题交换机是RabbitMQ中最灵活也是最强大的交换机类型。它结合了扇形交换机的广播能力和直连交换机的精确匹配能力,同时引入了模式匹配的概念。
主题交换机的工作方式:
2. 通配符规则
主题交换机的强大之处在于绑定键支持通配符:
-
*
(星号):匹配恰好一个单词
-
#
(井号):匹配零个或多个单词
3. 路由键格式最佳实践
路由键通常采用层次结构,便于模式匹配:
-
<facility>.<severity>
:auth.info
、kernel.error
-
<region>.<service>.<event>
:usa.payment.success
、europe.order.cancelled
-
<category>.<subcategory>.<action>
:news.sports.update
、weather.alert.severe
4. 使用场景
主题交换机适用于需要复杂、灵活的消息路由场景:
二、实操部分:构建智能新闻分发系统
我们将构建一个新闻分发系统,其中:
-
生产者发送带有分类路由键的新闻消息
-
消费者可以根据兴趣订阅特定模式的新闻
第1步:创建项目
-
创建一个新的解决方案。
-
添加一个控制台应用程序项目作为生产者:EmitLogTopic
。
-
添加多个消费者项目:
-
ReceiveNewsAll
- 接收所有新闻
-
ReceiveSportsNews
- 接收所有体育新闻
-
ReceiveUSNews
- 接收所有美国新闻
-
ReceiveCriticalAlerts
- 接收所有紧急警报
-
ReceiveWeatherUpdates
- 接收所有天气更新
-
为所有项目添加RabbitMQ.Client
NuGet包。
第2步:编写新闻生产者(EmitLogTopic.cs)
using System.Text;
using RabbitMQ.Client;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 声明主题交换机channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);// 路由键格式:<category>.<region>.<severity>// 示例:news.usa.info, sports.europe.alert, weather.asia.criticalvar routingKey = (args.Length > 0) ? args[0] : "anonymous.info";var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "topic_logs",routingKey: routingKey,basicProperties: null,body: body);Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");
}Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
第3步:编写接收所有新闻的消费者(ReceiveNewsAll.cs)
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);var queueName = channel.QueueDeclare().QueueName;// 使用 # 匹配所有消息channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "#");Console.WriteLine($" [*] Waiting for ALL news. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [ALL] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}
第4步:编写接收体育新闻的消费者(ReceiveSportsNews.cs)
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);var queueName = channel.QueueDeclare().QueueName;// 匹配所有体育相关的新闻:sports.*.*channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "sports.#");Console.WriteLine($" [*] Waiting for SPORTS news. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [SPORTS] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}
第5步:编写接收美国新闻的消费者(ReceiveUSNews.cs)
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);var queueName = channel.QueueDeclare().QueueName;// 匹配所有美国相关的新闻:*.usa.*channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.usa.*");Console.WriteLine($" [*] Waiting for USA news. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [USA] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}
第6步:编写接收紧急警报的消费者(ReceiveCriticalAlerts.cs)
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);var queueName = channel.QueueDeclare().QueueName;// 匹配所有紧急级别的消息:*.*.criticalchannel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.*.critical");Console.WriteLine($" [*] Waiting for CRITICAL alerts. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [CRITICAL] Received '{ea.RoutingKey}':'{message}'");Console.WriteLine(" -> Sending emergency notification!");};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}
第7步:编写接收天气更新的消费者(ReceiveWeatherUpdates.cs)
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);var queueName = channel.QueueDeclare().QueueName;// 匹配所有天气相关的更新:weather.*// 一个队列可以绑定多个模式channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "weather.#");channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.alert"); // 也接收所有警报
Console.WriteLine($" [*] Waiting for WEATHER updates and ALERTS. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [WEATHER/ALERT] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}
第8步:运行与演示
-
启动所有消费者
打开六个终端窗口,分别运行所有消费者程序。
-
发送各种类型的新闻消息
cd EmitLogTopic# 发送体育新闻
dotnet run "sports.usa.score" "Team USA wins gold medal"
dotnet run "sports.europe.update" "Champions League finals scheduled"# 发送美国相关新闻
dotnet run "news.usa.politics" "Election results announced"
dotnet run "tech.usa.innovation" "Silicon Valley startup raises $10M"# 发送紧急警报
dotnet run "weather.usa.critical" "Tornado warning for Midwest"
dotnet run "safety.europe.critical" "Security alert: System maintenance"# 发送天气更新
dotnet run "weather.asia.update" "Monsoon season begins"
dotnet run "news.europe.alert" "Breaking: Major announcement"# 发送其他消息
dotnet run "entertainment.hollywood.gossip" "Celebrity wedding announced"
-
观察路由结果并分析模式匹配
-
测试复杂场景
第9步:通配符规则详解示例
为了更好理解通配符,让我们看一些匹配示例:
绑定键 *.orange.*
的匹配情况:
-
✅ quick.orange.rabbit
(匹配)
-
✅ lazy.orange.elephant
(匹配)
-
❌ quick.orange.fox.lazy
(不匹配 - 四个单词)
-
❌ orange
(不匹配 - 只有一个单词)
-
❌ quick.brown.fox
(不匹配 - 中间不是orange)
绑定键 lazy.#
的匹配情况:
本章总结
在这一章中,我们深入学习了RabbitMQ中最强大的主题交换机,掌握了基于模式匹配的复杂消息路由:
-
主题交换机(Topic Exchange):理解了基于通配符的模式匹配路由机制。
-
通配符规则:掌握了*
(匹配一个单词)和#
(匹配零个或多个单词)的使用方法。
-
路由键设计:学习了使用点号分隔的层次化路由键设计最佳实践。
-
复杂路由场景:实现了支持多维度过滤的智能新闻分发系统。
-
多重模式绑定:掌握了单个队列绑定多个模式的高级用法。
主题交换机提供了无与伦比的灵活性,是构建复杂事件驱动系统的理想选择。在下一章,我们将转向另一个重要主题:消息可靠性保障,学习如何确保消息在复杂的分布式环境中绝不丢失。