当前位置: 首页 > news >正文

第5章:路由(Routing)与直连交换机(Direct Exchange)

本章目标

  • 理解直连交换机(Direct Exchange)的工作原理。

  • 掌握基于路由键(Routing Key)的消息过滤机制。

  • 学习多重绑定(Multiple Bindings)的概念。

  • 实现一个可以根据日志级别进行过滤的智能日志系统。


一、理论部分

1. 直连交换机(Direct Exchange)

直连交换机是比扇形交换机更智能的路由器。它的工作逻辑很简单:

  • 消息带着一个路由键(Routing Key) 被发送到直连交换机。

  • 队列通过一个绑定键(Binding Key) 绑定到交换机。

  • 当绑定键与路由键完全匹配时,消息就会被路由到该队列。

2. 路由键(Routing Key)与绑定键(Binding Key)

  • 路由键(Routing Key):生产者发送消息时指定的一个字符串,用于描述消息的特征或类型。

  • 绑定键(Binding Key):消费者在将队列绑定到交换机时指定的字符串,用于声明该队列对哪些消息感兴趣。

对于直连交换机,路由键和绑定键必须精确匹配(完全相等)。

3. 多重绑定(Multiple Bindings)

一个队列可以绑定多个绑定键,同样,多个队列也可以用相同的绑定键绑定到同一个交换机。这提供了很大的灵活性:

  • 一个队列,多个兴趣:队列Q1可以同时绑定errorwarning键,接收所有错误和警告消息。

  • 多个队列,相同兴趣:队列Q1和Q2都可以绑定info键,这样它们都会收到所有信息级别的消息(类似于扇形交换机的效果,但仅限于特定路由键)。

4. 使用场景

直连交换机非常适合需要有选择性接收消息的场景,比如:

  • 日志级别过滤:只接收特定级别(error、warning、info)的日志。

  • 业务类型路由:根据消息类型(order.created、payment.processed)路由到不同的处理服务。

  • 优先级处理:将高优先级任务路由到专用队列。


二、实操部分:构建智能日志系统

我们将改进第4章的日志系统,使其能够根据日志级别(error、warning、info)进行智能路由。

第1步:创建项目

  1. 创建一个新的解决方案。

  2. 添加一个控制台应用程序项目作为生产者:EmitLogDirect

  3. 添加三个控制台应用程序项目作为消费者:

    • ReceiveLogsDirect - 接收所有日志

    • ReceiveWarningsAndErrors - 只接收警告和错误

    • ReceiveErrorsOnly - 只接收错误

  4. 为所有项目添加RabbitMQ.Client NuGet包。

第2步:编写智能日志生产者(EmitLogDirect.cs)

using System.Text;
using RabbitMQ.Client;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 1. 声明一个直连交换机(Direct Exchange)channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 2. 从命令行参数获取日志级别和消息内容// 用法:dotnet run [severity] [message]// 示例:dotnet run error "Database connection failed"var severity = (args.Length > 0) ? args[0] : "info";var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";var body = Encoding.UTF8.GetBytes(message);// 3. 发布消息到直连交换机,并指定路由键(日志级别)channel.BasicPublish(exchange: "direct_logs",routingKey: severity, // 关键:使用日志级别作为路由键basicProperties: null,body: body);Console.WriteLine($" [x] Sent '{severity}':'{message}'");
}Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

关键点:

  • 交换机类型改为ExchangeType.Direct

  • 使用命令行第一个参数作为路由键(日志级别)。

  • BasicPublish中明确指定routingKey参数。

第3步:编写接收所有日志的消费者(ReceiveLogsDirect.cs)

这个消费者会绑定三个不同的绑定键,从而接收所有级别的日志。

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 声明直连交换机channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);var queueName = channel.QueueDeclare().QueueName;// 绑定到三个日志级别:接收所有消息// 注意:一个队列可以绑定多个路由键channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "info");channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "warning");channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "error");Console.WriteLine($" [*] Waiting for ALL logs (info, warning, error). Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [ALL] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

关键点:

  • 一个队列通过多次调用QueueBind绑定了三个不同的路由键。

  • 通过ea.RoutingKey可以获取消息的实际路由键。

第4步:编写接收警告和错误的消费者(ReceiveWarningsAndErrors.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);var queueName = channel.QueueDeclare().QueueName;// 只绑定到warning和error级别channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "warning");channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "error");Console.WriteLine($" [*] Waiting for WARNINGS and ERRORS only. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [WARN+ERR] Received '{ea.RoutingKey}':'{message}'");};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

第5步:编写只接收错误的消费者(ReceiveErrorsOnly.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);var queueName = channel.QueueDeclare().QueueName;// 只绑定到error级别channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "error");Console.WriteLine($" [*] Waiting for ERRORS only. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [ERRORS ONLY] Received '{ea.RoutingKey}':'{message}'");// 模拟错误处理(比如发送警报邮件)Console.WriteLine("    -> Sending alert email to admin...");Thread.Sleep(1000);};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

第6步:运行与演示

  1. 启动所有消费者
    打开四个终端窗口,分别运行:

    # 终端1 - 接收所有日志
    cd ReceiveLogsDirect
    dotnet run# 终端2 - 接收警告和错误
    cd ReceiveWarningsAndErrors  
    dotnet run# 终端3 - 只接收错误
    cd ReceiveErrorsOnly
    dotnet run# 终端4 - 用于发送消息
    cd EmitLogDirect
  2. 查看管理后台
    访问 http://localhost:15672,查看direct_logs交换机的绑定情况:

    • 你应该看到三个队列,每个队列有不同的绑定键组合。

    • ReceiveLogsDirect的队列绑定了三个键。

    • ReceiveWarningsAndErrors的队列绑定了两个键。

    • ReceiveErrorsOnly的队列只绑定了一个键。

  3. 发送不同级别的日志消息

     
     
    # 发送info消息
    dotnet run info "User login successful"# 发送warning消息  
    dotnet run warning "Database connection slow"# 发送error消息
    dotnet run error "Payment service unavailable"# 发送另一个info消息
    dotnet run info "Cache updated successfully"
  4. 观察路由结果
    观察各个消费者的输出:

    • 信息级别(info)消息:

      • ReceiveLogsDirect:✅ 接收(因为它绑定了info)

      • ReceiveWarningsAndErrors:❌ 不接收

      • ReceiveErrorsOnly:❌ 不接收

    • 警告级别(warning)消息:

      • ReceiveLogsDirect:✅ 接收

      • ReceiveWarningsAndErrors:✅ 接收

      • ReceiveErrorsOnly:❌ 不接收

    • 错误级别(error)消息:

      • ReceiveLogsDirect:✅ 接收

      • ReceiveWarningsAndErrors:✅ 接收

      • ReceiveErrorsOnly:✅ 接收(并模拟发送警报)

  5. 测试多重绑定效果
    再启动一个ReceiveErrorsOnly消费者(第二个实例):

     
     
    cd ReceiveErrorsOnly
    dotnet run

    发送一条error消息,你会发现两个ReceiveErrorsOnly实例都会收到消息,因为它们都使用相同的绑定键绑定到了同一个交换机。这展示了直连交换机也支持"一个路由键,多个队列"的广播式路由。


本章总结

在这一章中,我们学习了比扇形交换机更智能的直连交换机,并实现了精确的消息路由:

  1. 直连交换机(Direct Exchange):基于路由键和绑定键的精确匹配进行消息路由。

  2. 路由键与绑定键:理解了生产者指定路由键、消费者指定绑定键的分工。

  3. 多重绑定:掌握了队列可以绑定多个路由键,以及多个队列可以绑定相同路由键的灵活配置。

  4. 选择性消息消费:实现了消费者只接收特定类型消息的智能日志系统。

直连交换机提供了精确的路由控制,但有时候我们需要更灵活的模式匹配。在下一章,我们将学习功能最强大的主题交换机(Topic Exchange),它支持通配符匹配,能够实现极其复杂的消息路由规则。

http://www.hskmm.com/?act=detail&tid=15181

相关文章:

  • 搜索百科(4):OpenSearch — 开源搜索的新选择
  • JAVA的计算方式
  • 安装 elasticsearch-9.1.4 - 集群 和 kibana-9.1.4
  • 反码 原码 补码
  • 线性结构常见应用之栈[基于郝斌课程]
  • 实测对比:权威榜单之公众号排版Top 5(含效果对比与适用建议)
  • go的泛型
  • 原码补码反码
  • lc1034-边界着色
  • 【汽车电子】汽车功能安全标准 ISO 26262
  • ISO 26262的不同安全等级:ASIL-D ASIL-C ASIL-B ASIL-A
  • C#学习1
  • 02020405 EF Core基础05-EF Core反向工程、EF Core和ADO.NET Core的联系、EF Core无法做到的事情
  • 02020406 EF Core基础06-EF Core生成的SQL
  • Gemini-2.5-Flash-Image-Preview 与 GPT-4o 图像生成能力技术差异解析​ - 教程
  • 新学期每日总结(第2天)
  • 在CodeBolcks下wxSmith的C++编程教程——使用菜单和组件
  • 单调队列
  • 软工第一次编程
  • 第二次软工作业
  • 9.23总结
  • 日志|力扣|不同路径|最小路径和|动态规划|Javase|IO|File|Javaweb
  • 如何建立 5 μm 精度的视觉检测?不仅仅是相机的事
  • 函数 cmd_info_change_cur_model_group
  • 线程--相关概念、两种创建线程的方式
  • 恢复某个数据文件不适当,导致DataGuard无法open数据库
  • Nginx 部署及配置
  • vite静态资源处理
  • 洛谷B4040 [GESP202409 四级] 黑白方块 题解
  • SerpApi:一站式搜索引擎数据抓取API完全指南