当前位置: 首页 > news >正文

发布/订阅(Publish/Subscribe)与交换机(Exchange)

 本章目标

  • 理解交换机(Exchange)在RabbitMQ中的核心作用。

  • 掌握发布/订阅模式(Publish/Subscribe)的实现。

  • 学习扇形交换机(Fanout Exchange)的使用。

  • 理解绑定(Binding)的概念。

  • 实现一个日志广播系统。


一、理论部分

1. 交换机(Exchange)简介

在前面的章节中,我们一直使用默认交换机(空字符串""),生产者直接将消息发送到队列。但RabbitMQ的真正强大之处在于它的交换机机制。

交换机是消息的入口点。生产者将消息发送到交换机,而不是直接发送到队列。交换机根据特定的规则和类型,决定将消息路由到哪些队列中。

2. 交换机类型

RabbitMQ提供了几种不同类型的交换机,每种都有不同的路由行为:

  • 扇形交换机(Fanout Exchange):将消息广播到所有绑定到它的队列中(忽略路由键)。本章重点。

  • 直连交换机(Direct Exchange):根据精确匹配的路由键将消息路由到队列(我们在第2章使用的默认交换机就是直连类型)。

  • 主题交换机(Topic Exchange):基于模式匹配的路由(使用通配符)。

  • 头交换机(Headers Exchange):基于消息头属性而不是路由键进行路由。

3. 发布/订阅模式(Publish/Subscribe)

发布/订阅模式的核心思想是:一条消息被分发给多个消费者。每个消费者都会收到相同的消息副本。这非常适合需要将同一信息通知给多个不同系统的场景,比如:

  • 日志广播系统

  • 新闻推送

  • 系统事件通知

  • 缓存更新通知

4. 绑定(Binding)

绑定是交换机和队列之间的连接。你可以理解为:"这个队列对这个交换机的消息感兴趣"。当我们创建绑定时,可以指定一个绑定键(Binding Key),交换机用它来决定哪些消息应该被路由到这个队列。

对于扇形交换机,绑定键会被忽略,所有绑定到该交换机的队列都会收到消息。


二、实操部分:构建日志广播系统

我们将创建一个日志系统,其中:

  • 一个生产者发送日志消息到扇形交换机。

  • 多个消费者分别将日志保存到文件、打印到控制台等,每个消费者都会收到所有的日志消息。

第1步:创建项目

  1. 创建一个新的解决方案。

  2. 添加一个控制台应用程序项目作为生产者:EmitLog

  3. 添加两个控制台应用程序项目作为消费者:ReceiveLogs(用于控制台输出)和SaveLogs(模拟保存到文件)。实际中可以创建更多消费者。

  4. 为所有项目添加RabbitMQ.Client NuGet包。

第2步:编写日志生产者(EmitLog.cs)

using System.Text;
using RabbitMQ.Client;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 1. 声明一个扇形交换机(Fanout Exchange)// 参数说明:// exchange: "logs" - 交换机的名称// type: "fanout" - 交换机类型// durable: false - 是否持久化(服务器重启后是否存在)// autoDelete: false - 当所有队列都解绑后,是否自动删除交换机channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);// 2. 准备消息(从命令行参数获取或使用默认消息)var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);// 3. 发布消息到交换机(而不是队列!)// 关键变化:指定exchange参数为"logs",而不是空字符串// 对于fanout交换机,routingKey会被忽略,但通常我们还是提供一个有意义的键channel.BasicPublish(exchange: "logs",routingKey: "", // 对于fanout交换机,这个值被忽略basicProperties: null,body: body);Console.WriteLine($" [x] Sent to exchange 'logs': {message}");
}Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();static string GetMessage(string[] args)
{return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!";
}

关键点:

  • 使用ExchangeDeclare方法声明一个名为logs的扇形交换机。

  • BasicPublish中指定exchange: "logs",而不是之前的空字符串。

  • 对于扇形交换机,routingKey参数被忽略,但我们仍然提供它。

第3步:编写第一个日志消费者(ReceiveLogs.cs)- 控制台输出

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 1. 声明扇形交换机(必须与生产者使用相同的交换机名称和类型)channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);// 2. 声明一个临时队列// 关键点:我们不指定队列名称,让RabbitMQ生成一个随机名称// exclusive: true - 当连接关闭时,队列会被自动删除var queueName = channel.QueueDeclare().QueueName;// 3. 将队列绑定到交换机// 对于fanout交换机,bindingKey被忽略(这里用空字符串)
    channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");Console.WriteLine($" [*] Waiting for logs. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] [CONSOLE] {message}");};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

关键点:

  • 使用QueueDeclare()而不指定队列名,让RabbitMQ生成一个唯一的、随机的队列名。

  • 设置exclusive: true(默认值),这样当消费者断开连接时,队列会被自动删除。

  • 使用QueueBind将队列绑定到logs交换机。

第4步:编写第二个日志消费者(SaveLogs.cs)- 模拟保存到文件

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{// 同样的步骤:声明交换机、创建临时队列、绑定channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);var queueName = channel.QueueDeclare().QueueName;channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");Console.WriteLine($" [*] Waiting for logs to save. Queue: {queueName}");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 模拟将日志保存到文件(这里只是打印,实际中可以写入文件)Console.WriteLine($" [x] [FILE] Saved log: {message} - {DateTime.Now:yyyy-MM-dd HH:mm:ss}");// 模拟文件写入的延迟Thread.Sleep(500);};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();
}

这个消费者与第一个几乎相同,只是模拟了不同的处理逻辑(保存到文件)。

第5步:运行与演示

  1. 启动多个消费者
    打开三个终端窗口:

    • 窗口1:运行ReceiveLogs(控制台输出消费者)

    • 窗口2:运行SaveLogs(文件保存消费者)

    • 窗口3:运行ReceiveLogs(另一个控制台输出消费者)

    每个消费者启动时都会显示一个随机生成的队列名,如:

     
    [*] Waiting for logs. Queue: amq.gen-JzTY20BRgKO-HjmUJj0wLg
  2. 查看管理后台
    访问 http://localhost:15672,查看Exchanges标签页:

    • 你会看到新创建的logs交换机,类型为fanout

    • 点击logs交换机,在绑定(Bindings)部分,你会看到3个队列绑定到了这个交换机。

  3. 发送日志消息
    在另一个终端运行EmitLog项目:

    cd EmitLog
    dotnet run "User john.doe logged in successfully"
    dotnet run "Warning: Database connection pool 80% full"
    dotnet run "Error: Payment service timeout after 30 seconds"
  4. 观察现象

    • 所有三个消费者都会收到并处理每一条消息!

    • 消息被广播到了所有绑定到logs交换机的队列。

    • 每个消费者可以以不同的方式处理同一条消息。

  5. 动态测试

    • 在生产者运行期间,启动第四个消费者。

    • 你会发现新启动的消费者只能收到之后发送的消息,而无法收到之前已经发送的消息。这是因为扇形交换机只负责将当前和未来的消息广播给所有绑定的队列。


本章总结

在这一章中,我们深入学习了RabbitMQ的核心组件——交换机,并实现了强大的发布/订阅模式:

  1. 交换机(Exchange):理解了交换机作为消息入口点的作用,以及它与队列的关系。

  2. 扇形交换机(Fanout Exchange):掌握了最简单的交换机类型,它将消息无条件地广播到所有绑定的队列。

  3. 发布/订阅模式:实现了一条消息被多个消费者同时接收的场景。

  4. 临时队列:学习了如何创建匿名队列,用于短暂的发布/订阅场景。

  5. 绑定(Binding):理解了队列和交换机之间的连接关系。

现在你已经能够构建消息广播系统了。在下一章,我们将学习直连交换机(Direct Exchange),实现有选择性的消息路由——让消费者只接收它们感兴趣的消息(比如只处理error级别的日志)。这将使我们的消息系统更加精细和高效。

http://www.hskmm.com/?act=detail&tid=13862

相关文章:

  • 线性结构之链表
  • 二叉树最近公共祖先
  • AI 编程“效率幻觉”:为何你感觉快了,项目却慢了?
  • lc1033-移动石子直到连续
  • 一些正在制作的“格林达姆”测试项目,以及“假无损”
  • 个人项目
  • 北京 意大利学签 北京意大利签证中心 贵宾 vip vfs
  • 第1周
  • 多商家在线客服系统 - 客服用户表设计方案
  • 九月22号
  • 25.9.22 继续MySQL
  • 使用python读取windows注册表
  • 当日总结
  • 3123004481
  • 使用python读取windows日志表
  • 开机RAM分析调试SOP
  • 9.20 模拟赛 T4
  • 2025.9.21 测试 (a1a2a3a4a5)
  • 原码、反码和补码
  • Русский язык
  • 基于Hex Editor Neo的二进制文件模板
  • 【F#学习】字符
  • kubebuilder创建Operator示例
  • 集训总结(八)
  • 使用try-finally结构执行状态重置
  • java03预习
  • x6831卡顿分析
  • 实测对比:权威榜单之微信排版软件Top5(含详细测评)
  • 【F#学习】布尔运算优先级
  • 粘连字符验证码的分割与识别思路