当前位置: 首页 > news >正文

11_Reactor网络模型

一、Reactor模型基本原理

Reactor模型是一种基于事件驱动的设计模式,特别适合处理高并发的I/O密集型应用。

image

Reactor模型的核心思想很简单,但又很巧妙,它围绕着"事件"展开。不同于传统模型中线程主动等待I/O完成,Reactor模型采用了完全不同的思路:

  1. 有一个事件分离器(通常是I/O多路复用技术如select、poll或epoll)监听所有连接上的事件;
  2. 当有事件发生时,事件分离器通知事件处理器;
  3. 事件处理器负责处理对应的事件,如接受新连接、读数据、写数据等;
  4. 所有操作都在事件循环中进行,无需阻塞等待。

这种设计的最大特点是:单线程可以处理多个连接,而且只在有事件发生时才会处理,极大地提高了CPU利用率。

Reactor模型的核心组件:

  • 事件源(Event Source):产生事件的对象,如socket连接
  • 事件多路分离器(Event Demultiplexer):I/O多路复用机制,如select/poll/epoll
  • 事件分发器(Dispatcher):将事件分发给对应的处理器
  • 事件处理器(Event Handler):具体处理不同类型事件的逻辑

优点:

  • 高效处理I/O:避免了阻塞等待,一个线程可以处理多个连接
  • 可扩展性好:可以根据需要增加Reactor线程数量
  • 编程模型清晰:基于事件驱动,职责分明

缺点:

  • 复杂性:比传统模型复杂,调试也更困难
  • 对长时间操作不友好:如果某个事件处理器占用太长时间,会阻塞其他事件的处理

运用场景:

  • 高并发场景:当需要同时处理成千上万个连接时
  • I/O密集型应用:如Web服务器、代理服务器、消息中间件等
  • 实时性要求高:需要快速响应客户端请求

用一段伪代码来表示Reactor模型的基本工作流程:

// Reactor主循环
while (true) {// 使用I/O多路复用等待事件发生events = demultiplexer.wait();// 遍历所有就绪的事件for (event in events) {// 根据事件类型,调用对应的处理函数switch (event.type) {case ACCEPT:handleAccept(event);break;case READ:handleRead(event);break;case WRITE:handleWrite(event);break;case CLOSE:handleClose(event);break;}}
}// 处理新连接
function handleAccept(event) {// 接受新连接newConnection = accept(event.fd);// 设置新连接为非阻塞setNonBlocking(newConnection);// 注册读事件处理器,等待客户端发送数据demultiplexer.register(newConnection, READ, handleRead);
}// 处理读事件
function handleRead(event) {// 读取数据data = read(event.fd);if (data.length > 0) {// 处理请求response = processRequest(data);// 注册写事件处理器,准备发送响应demultiplexer.register(event.fd, WRITE, handleWrite);// 保存响应数据,供写事件处理器使用connections[event.fd].response = response;} else {// 客户端关闭连接handleClose(event);}
}// 处理写事件
function handleWrite(event) {// 获取要发送的响应数据response = connections[event.fd].response;// 发送数据write(event.fd, response);// 再次注册读事件处理器,等待客户端的下一个请求demultiplexer.register(event.fd, READ, handleRead);
}// 处理关闭连接
function handleClose(event) {// 从demultiplexer中移除demultiplexer.unregister(event.fd);// 关闭连接close(event.fd);// 清理相关资源delete connections[event.fd];
}

二、Reactor模型的三种实现方式

随着服务器要处理的连接数增多,单线程Reactor模式可能会成为瓶颈。因此,Reactor模型有三种常见的实现方式。

2.1 单Reactor单线程模型

image

单Reactor单线程模型是最简单的实现,所有工作都在同一个线程中完成:

  • Reactor线程负责监听连接、接受连接、读写数据和处理业务逻辑
  • 优点是简单,没有并发问题
  • 缺点是无法充分利用多核CPU,业务处理复杂时会造成整个服务阻塞

适用场景:连接数少且业务处理简单的场景,如Redis在单线程模式下的运行方式。

2.2 单Reactor多线程模型

image

单Reactor多线程模型相比单线程模型有了明显改进:

  • Reactor线程负责监听连接、接受连接和读写数据
  • 业务处理放在线程池中进行,避免了业务处理阻塞网络I/O
  • 优点是能够充分利用多核CPU,提高处理能力

缺点:

  • Reactor线程仍然是瓶颈,连接数量达到一定程度后,单个Reactor线程可能无法处理所有的I/O事件,导致性能下降
  • 多线程间共享数据需要考虑并发问题,增加了编程复杂度

2.3 主从Reactor多线程模型

image

主从Reactor多线程模型是三种模式中最强大的一种,也是目前主流高性能服务器采用的模型:

  • 主Reactor线程:只负责监听连接和接受新连接,然后将新连接分发给从Reactor线程
  • 从Reactor线程:负责处理已连接的socket上的读写事件
  • 业务线程池:负责执行具体业务逻辑,完全与网络I/O分离

优点:

  • 职责分明:主Reactor只负责接受连接,从Reactor只负责I/O,业务线程只负责处理业务逻辑
  • 可扩展性强:可以根据需要调整从Reactor和业务线程的数量
  • 减轻了主Reactor的负担:避免了单Reactor的性能瓶颈
  • 充分利用多核:不同Reactor和业务线程可以运行在不同的CPU核心上

三、Reactor模型的实际应用

基于Reactor模式的TCP服务器:

#include <iostream>
#include <vector>
#include <map>
#include <functional>
#include <memory>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#include <string.h>
#include <arpa/inet.h>// 事件处理器基类
class EventHandler {
public:EventHandler(int fd) : fd_(fd) {}virtual ~EventHandler() = default;virtual void handleRead() = 0;virtual void handleWrite() = 0;int getFd() const { return fd_; }protected:int fd_;
};// Reactor类
class Reactor {
public:Reactor() {// 创建epoll实例epoll_fd_ = epoll_create1(0);if (epoll_fd_ == -1) {std::cerr << "Failed to create epoll instance" << std::endl;exit(EXIT_FAILURE);}}~Reactor() {if (epoll_fd_ != -1) {close(epoll_fd_);}}// 注册事件处理器void registerHandler(std::shared_ptr<EventHandler> handler, uint32_t events) {struct epoll_event ev;ev.events = events;ev.data.fd = handler->getFd();if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, handler->getFd(), &ev) == -1) {std::cerr << "Failed to add fd to epoll" << std::endl;return;}handlers_[handler->getFd()] = handler;}// 修改事件void modifyHandler(std::shared_ptr<EventHandler> handler, uint32_t events) {struct epoll_event ev;ev.events = events;ev.data.fd = handler->getFd();if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, handler->getFd(), &ev) == -1) {std::cerr << "Failed to modify fd in epoll" << std::endl;return;}}// 移除事件处理器void removeHandler(std::shared_ptr<EventHandler> handler) {epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, handler->getFd(), nullptr);handlers_.erase(handler->getFd());}// 事件循环void eventLoop() {const int MAX_EVENTS = 10;struct epoll_event events[MAX_EVENTS];while (true) {int nfds = epoll_wait(epoll_fd_, events, MAX_EVENTS, -1);if (nfds == -1) {std::cerr << "epoll_wait error" << std::endl;break;}for (int i = 0; i < nfds; i++) {int fd = events[i].data.fd;auto it = handlers_.find(fd);if (it != handlers_.end()) {if (events[i].events & EPOLLIN) {it->second->handleRead();}if (events[i].events & EPOLLOUT) {it->second->handleWrite();}}}}}private:int epoll_fd_;std::map<int, std::shared_ptr<EventHandler>> handlers_;
};// 工具函数,设置socket为非阻塞
void setNonBlocking(int fd) {int flags = fcntl(fd, F_GETFL, 0);fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}// 接受连接处理器
class AcceptorHandler : public EventHandler {
public:AcceptorHandler(int port, Reactor& reactor) : EventHandler(-1), reactor_(reactor) {// 创建监听socketfd_ = socket(AF_INET, SOCK_STREAM, 0);if (fd_ < 0) {std::cerr << "Failed to create socket" << std::endl;exit(EXIT_FAILURE);}// 允许地址复用int opt = 1;setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));// 绑定地址struct sockaddr_in addr;memset(&addr, 0, sizeof(addr));addr.sin_family = AF_INET;addr.sin_addr.s_addr = INADDR_ANY;addr.sin_port = htons(port);if (bind(fd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {std::cerr << "Failed to bind" << std::endl;exit(EXIT_FAILURE);}// 监听if (listen(fd_, SOMAXCONN) < 0) {std::cerr << "Failed to listen" << std::endl;exit(EXIT_FAILURE);}setNonBlocking(fd_);std::cout << "Server listening on port " << port << std::endl;}~AcceptorHandler() {if (fd_ >= 0) {close(fd_);}}void handleRead() override {struct sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);int client_fd = accept(fd_, (struct sockaddr*)&client_addr, &client_len);if (client_fd < 0) {std::cerr << "Failed to accept" << std::endl;return;}setNonBlocking(client_fd);char client_ip[INET_ADDRSTRLEN];inet_ntop(AF_INET, &client_addr.sin_addr, client_ip, sizeof(client_ip));std::cout << "New connection from " << client_ip << ":" << ntohs(client_addr.sin_port) << std::endl;// 创建连接处理器auto handler = std::make_shared<ConnectionHandler>(client_fd, reactor_);reactor_.registerHandler(handler, EPOLLIN | EPOLLET);}void handleWrite() override {// 作为接受器,不需要处理写事件}private:Reactor& reactor_;
};// 连接处理器
class ConnectionHandler : public EventHandler {
public:ConnectionHandler(int client_fd, Reactor& reactor): EventHandler(client_fd), reactor_(reactor), write_ready_(false) {}~ConnectionHandler() {if (fd_ >= 0) {close(fd_);}}void handleRead() override {char buffer[1024];ssize_t n;while ((n = read(fd_, buffer, sizeof(buffer))) > 0) {buffer[n] = '\0';write_buffer_ += buffer;}if (n == 0) {// 客户端关闭连接std::cout << "Connection closed" << std::endl;reactor_.removeHandler(shared_from_this());return;} else if (n < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {// 读取完毕if (!write_buffer_.empty()) {// 准备写入响应write_ready_ = true;reactor_.modifyHandler(shared_from_this(), EPOLLIN | EPOLLOUT | EPOLLET);}} else {std::cerr << "Read error" << std::endl;reactor_.removeHandler(shared_from_this());}}}void handleWrite() override {if (!write_ready_) {return;}ssize_t n = write(fd_, write_buffer_.c_str(), write_buffer_.size());if (n < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {// 稍后再试return;}std::cerr << "Write error" << std::endl;reactor_.removeHandler(shared_from_this());return;}// 写入完成write_buffer_.clear();write_ready_ = false;// 修改为只关注读事件reactor_.modifyHandler(shared_from_this(), EPOLLIN | EPOLLET);}private:Reactor& reactor_;std::string write_buffer_;bool write_ready_;
};int main() {Reactor reactor;// 创建接受器,监听8888端口auto acceptor = std::make_shared<AcceptorHandler>(8888, reactor);reactor.registerHandler(acceptor, EPOLLIN);// 启动事件循环reactor.eventLoop();return 0;
}

Reactor模式的核心思想:

  1. Reactor类:实现了事件循环,负责监听事件和分发事件
  2. EventHandler基类:提供了事件处理的接口
  3. AcceptorHandler:处理连接事件
  4. ConnectionHandler:处理已建立连接的读写事件

这是单Reactor单线程模型的实现。虽然简单,但已经能看出Reactor模式的优势,能够利用一个线程处理多个客户端连接,避免了传统的一连接一线程模型的资源浪费。

如果要实现主从Reactor多线程模型,我们需要:

  1. 创建一个主Reactor线程,专门负责接受连接
  2. 创建多个从Reactor线程,处理读写事件
  3. 创建业务线程池,处理具体业务逻辑
http://www.hskmm.com/?act=detail&tid=17148

相关文章:

  • 「LNOI2022」盒
  • 【文摘随笔】从业开发工作五年后,再读短篇《孔乙己》——年少不懂孔乙己,长大已成孔乙己
  • 为什么我选择了 PSM 敏捷认证?
  • 外键
  • 菜油
  • 索引
  • 存储过程
  • 编写msyql8.0.21 数据库批量备份脚本
  • 完整教程:基础算法---【差分】
  • Android 源码中如何生成一个platform JKS 文件?
  • WPF小知识
  • 后端面试八股(go 方向)
  • ArcGIS 不重叠且无缝的拓扑检查和修改
  • 2025/9/25
  • 读书笔记:揭开索引的两个常见误区
  • 国标GB28181平台EasyGBS如何赋能路网数字化管理与应急指挥?
  • 获取用户ip所在城市
  • 【Proteus仿真】AT89C51单片机串行数据转换为并行仿真 - 实践
  • 第13章 day14-15 Webpack逆向
  • Viper远程配置踩坑记录
  • 国产智能体脂秤PCBA方案设计
  • 第15章 day18 Ast系列篇
  • 微波雷达模块在智能家居中的具体应用案例有哪些?
  • Ubuntu 桌面快捷方式创建增加记录
  • 队列
  • arm64中的内存屏障指令
  • 三分
  • 完整教程:微服务基础2-网关路由
  • nginx ipv6 proxy配置
  • (三)数仓人必看!ODS 到 DWS 各层设计规范全解析,含同步/存储/质量核心要点