本章目标
-
理解直连交换机(Direct Exchange)的工作原理。
-
掌握基于路由键(Routing Key)的消息过滤机制。
-
学习多重绑定(Multiple Bindings)的概念。
-
实现一个可以根据日志级别进行过滤的智能日志系统。
一、理论部分
1. 直连交换机(Direct Exchange)
直连交换机是比扇形交换机更智能的路由器。它的工作逻辑很简单:
-
消息带着一个路由键(Routing Key) 被发送到直连交换机。
-
队列通过一个绑定键(Binding Key) 绑定到交换机。
-
当绑定键与路由键完全匹配时,消息就会被路由到该队列。
2. 路由键(Routing Key)与绑定键(Binding Key)
-
路由键(Routing Key):生产者发送消息时指定的一个字符串,用于描述消息的特征或类型。
-
绑定键(Binding Key):消费者在将队列绑定到交换机时指定的字符串,用于声明该队列对哪些消息感兴趣。
对于直连交换机,路由键和绑定键必须精确匹配(完全相等)。
3. 多重绑定(Multiple Bindings)
一个队列可以绑定多个绑定键,同样,多个队列也可以用相同的绑定键绑定到同一个交换机。这提供了很大的灵活性:
-
一个队列,多个兴趣:队列Q1可以同时绑定
error
和warning
键,接收所有错误和警告消息。 -
多个队列,相同兴趣:队列Q1和Q2都可以绑定
info
键,这样它们都会收到所有信息级别的消息(类似于扇形交换机的效果,但仅限于特定路由键)。
4. 使用场景
直连交换机非常适合需要有选择性接收消息的场景,比如:
-
日志级别过滤:只接收特定级别(error、warning、info)的日志。
-
业务类型路由:根据消息类型(order.created、payment.processed)路由到不同的处理服务。
-
优先级处理:将高优先级任务路由到专用队列。
二、实操部分:构建智能日志系统
我们将改进第4章的日志系统,使其能够根据日志级别(error、warning、info)进行智能路由。
第1步:创建项目
-
创建一个新的解决方案。
-
添加一个控制台应用程序项目作为生产者:
EmitLogDirect
。 -
添加三个控制台应用程序项目作为消费者:
-
ReceiveLogsDirect
- 接收所有日志 -
ReceiveWarningsAndErrors
- 只接收警告和错误 -
ReceiveErrorsOnly
- 只接收错误
-
-
为所有项目添加
RabbitMQ.Client
NuGet包。
第2步:编写智能日志生产者(EmitLogDirect.cs)
using System.Text; using RabbitMQ.Client;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) {// 1. 声明一个直连交换机(Direct Exchange)channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 2. 从命令行参数获取日志级别和消息内容// 用法:dotnet run [severity] [message]// 示例:dotnet run error "Database connection failed"var severity = (args.Length > 0) ? args[0] : "info";var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";var body = Encoding.UTF8.GetBytes(message);// 3. 发布消息到直连交换机,并指定路由键(日志级别)channel.BasicPublish(exchange: "direct_logs",routingKey: severity, // 关键:使用日志级别作为路由键basicProperties: null,body: body);Console.WriteLine($" [x] Sent '{severity}':'{message}'"); }Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
关键点:
-
交换机类型改为
ExchangeType.Direct
。 -
使用命令行第一个参数作为路由键(日志级别)。
-
在
BasicPublish
中明确指定routingKey
参数。
第3步:编写接收所有日志的消费者(ReceiveLogsDirect.cs)
这个消费者会绑定三个不同的绑定键,从而接收所有级别的日志。
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) {// 声明直连交换机channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);var queueName = channel.QueueDeclare().QueueName;// 绑定到三个日志级别:接收所有消息// 注意:一个队列可以绑定多个路由键channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "info");channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "warning");channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "error");Console.WriteLine($" [*] Waiting for ALL logs (info, warning, error). Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [ALL] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine(); }
关键点:
-
一个队列通过多次调用
QueueBind
绑定了三个不同的路由键。 -
通过
ea.RoutingKey
可以获取消息的实际路由键。
第4步:编写接收警告和错误的消费者(ReceiveWarningsAndErrors.cs)
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) {channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);var queueName = channel.QueueDeclare().QueueName;// 只绑定到warning和error级别channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "warning");channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "error");Console.WriteLine($" [*] Waiting for WARNINGS and ERRORS only. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [WARN+ERR] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine(); }
第5步:编写只接收错误的消费者(ReceiveErrorsOnly.cs)
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) {channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);var queueName = channel.QueueDeclare().QueueName;// 只绑定到error级别channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "error");Console.WriteLine($" [*] Waiting for ERRORS only. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [ERRORS ONLY] Received '{ea.RoutingKey}':'{message}'");// 模拟错误处理(比如发送警报邮件)Console.WriteLine(" -> Sending alert email to admin...");Thread.Sleep(1000);};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine(); }
第6步:运行与演示
-
启动所有消费者
打开四个终端窗口,分别运行: -
查看管理后台
访问 http://localhost:15672,查看direct_logs
交换机的绑定情况:-
你应该看到三个队列,每个队列有不同的绑定键组合。
-
ReceiveLogsDirect
的队列绑定了三个键。 -
ReceiveWarningsAndErrors
的队列绑定了两个键。 -
ReceiveErrorsOnly
的队列只绑定了一个键。
-
-
发送不同级别的日志消息
# 发送info消息 dotnet run info "User login successful"# 发送warning消息 dotnet run warning "Database connection slow"# 发送error消息 dotnet run error "Payment service unavailable"# 发送另一个info消息 dotnet run info "Cache updated successfully"
-
观察路由结果
观察各个消费者的输出:-
信息级别(info)消息:
-
ReceiveLogsDirect
:✅ 接收(因为它绑定了info) -
ReceiveWarningsAndErrors
:❌ 不接收 -
ReceiveErrorsOnly
:❌ 不接收
-
-
警告级别(warning)消息:
-
ReceiveLogsDirect
:✅ 接收 -
ReceiveWarningsAndErrors
:✅ 接收 -
ReceiveErrorsOnly
:❌ 不接收
-
-
错误级别(error)消息:
-
ReceiveLogsDirect
:✅ 接收 -
ReceiveWarningsAndErrors
:✅ 接收 -
ReceiveErrorsOnly
:✅ 接收(并模拟发送警报)
-
-
-
测试多重绑定效果
再启动一个ReceiveErrorsOnly
消费者(第二个实例):cd ReceiveErrorsOnly dotnet run
发送一条error消息,你会发现两个
ReceiveErrorsOnly
实例都会收到消息,因为它们都使用相同的绑定键绑定到了同一个交换机。这展示了直连交换机也支持"一个路由键,多个队列"的广播式路由。
本章总结
在这一章中,我们学习了比扇形交换机更智能的直连交换机,并实现了精确的消息路由:
-
直连交换机(Direct Exchange):基于路由键和绑定键的精确匹配进行消息路由。
-
路由键与绑定键:理解了生产者指定路由键、消费者指定绑定键的分工。
-
多重绑定:掌握了队列可以绑定多个路由键,以及多个队列可以绑定相同路由键的灵活配置。
-
选择性消息消费:实现了消费者只接收特定类型消息的智能日志系统。
直连交换机提供了精确的路由控制,但有时候我们需要更灵活的模式匹配。在下一章,我们将学习功能最强大的主题交换机(Topic Exchange),它支持通配符匹配,能够实现极其复杂的消息路由规则。