当前位置: 首页 > news >正文

基于 IOCP 的协程调度器——零基础深入浅出 C++20 协程

前言

上一篇《基于 epoll 的协程调度器》谈到如何基于 epoll 构建一个事件驱动的协程调度器,没有使用三方库的原因主要是为了避免引入额外复杂度,不过只演示 Linux 未免对非 Unix 平台的小伙伴有所不公,为此本文基于 Windows 的完成端口 (IO Completion Port:IOCP) 构建相同能力的 demo。

文章仍然遵守之前的创作原则:

* 选取合适的 demo 是头等大事

* 以协程为目标,涉及到的新语法会简单说明,不涉及的不旁征博引

* 若语法的原理非常简单,也会简单展开讲讲,有利于透过现象看本质,用起来更得心应手

上一篇文章里不光引入了基于事件的调度器,还说明了如何开启多文件并行、await_suspend 与试读的关系、singalfd 用于完美退出等话题,如果没有这些内容铺垫,看本文时会有很多地方难以理解,还没看过的小伙伴,墙裂建议先看那篇。

工具还是之前介绍过的 Compile Explorer,这里不再用到 C++ Insights ,主要是它不支持 Windows 平台,其实 Compiler Explorer 也只是编译,运行的话还是不太行,因为它的环境不支持像文件、网络之类的异步 IO,需要用户自行搭建开发环境。

基于完成端口的 IO 多路复用

上文中提到了 Unix 系统中多路复用接口的发展历程:分别经历了 select -> poll -> epoll/kqueue,Windows 则通过完成端口一统江山,其实它俩调用方式差不太多:

  epoll IOCP
初始化 epoll_create
CreateIoCompletionPort
关联句柄 epoll_ctl
CreateIoCompletionPort
等待并获取下一个事件 epoll_wait
GetQueuedCompletionStatus
投递事件 n/a (self pipe trick) PostQueuedCompletionStatus
销毁 close CloseHandle

而在可等待对象上,IOCP 则丰富的多:

* 文件 I/O 事件​​
* 文件系统变更
* 套接字(Socket)事件​​
* 命名管道(Named Pipe)事件​​
* 设备 I/O 事件​​
* 定时器事件(结合 Waitable Timer)​​

这方面能与它相提并论的恐怕只有 kqueue 了。有了上面的铺垫再参考之前 epoll 的实现,直接上 demo 源码:

#include <coroutine>
#include <unordered_map>
#include <windows.h>
#include <vector>
#include <stdexcept>
#include <iostream>
#include <sstream>
#include <memory>struct Task {struct promise_type {Task get_return_object() { return {}; }std::suspend_never initial_suspend() { return {}; }std::suspend_never final_suspend() noexcept { return {}; }void return_void() {}void unhandled_exception() { std::terminate(); }};
};class IocpScheduler {
private:HANDLE iocp_handle;std::unordered_map<HANDLE, std::coroutine_handle<>> io_handles;public:IocpScheduler() {iocp_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);if (iocp_handle == NULL) {throw std::runtime_error("CreateIoCompletionPort failed");}}~IocpScheduler() {CloseHandle(iocp_handle);}void register_io(HANDLE file_handle, std::coroutine_handle<> handle) {if (io_handles.find(file_handle) == io_handles.end()) {io_handles[file_handle] = handle;if (CreateIoCompletionPort(file_handle, iocp_handle, (ULONG_PTR)file_handle, 0) == NULL) {throw std::runtime_error("CreateIoCompletionPort failed to associate file handle");}}}void run() {while (true) {DWORD bytes_transferred = 0;ULONG_PTR completion_key = 0;LPOVERLAPPED overlapped = nullptr;BOOL success = GetQueuedCompletionStatus(iocp_handle,&bytes_transferred,&completion_key,&overlapped,INFINITE);if (completion_key != 0) {HANDLE ready_handle = (HANDLE)completion_key;if (auto it = io_handles.find(ready_handle); it != io_handles.end()) {it->second.resume();}}}}
};struct AsyncReadAwaiter {IocpScheduler& sched;HANDLE file_handle;std::unique_ptr<char[]> buffer;DWORD buffer_size;OVERLAPPED overlapped;DWORD bytes_read;AsyncReadAwaiter(IocpScheduler& s, HANDLE file, DWORD size): sched(s), file_handle(file), buffer_size(size), bytes_read(0) {buffer = std::make_unique<char[]>(size);ZeroMemory(&overlapped, sizeof(OVERLAPPED));}bool await_ready() const {return false;}void await_suspend(std::coroutine_handle<> h) {sched.register_io(file_handle, h);if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {DWORD error = GetLastError();if (error != ERROR_IO_PENDING) {std::stringstream ss;ss << "ReadFile failed, error " << error;throw std::runtime_error(ss.str());}}}std::string await_resume() {DWORD bytes_transferred = 0;if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {DWORD error = GetLastError();std::stringstream ss;ss << "GetOverlappedResult failed, error " << error;throw std::runtime_error(ss.str());}return std::string(buffer.get(), bytes_transferred);}
};Task async_read_file(IocpScheduler& sched, const char* path) {HANDLE file_handle = CreateFileA(path,GENERIC_READ,FILE_SHARE_READ,NULL,OPEN_EXISTING,FILE_FLAG_OVERLAPPED,NULL);if (file_handle == INVALID_HANDLE_VALUE) {std::stringstream ss;ss << "CreateFile failed, error " << GetLastError();throw std::runtime_error(ss.str());}while (true) {auto data = co_await AsyncReadAwaiter(sched, file_handle, 4096);std::cout << "Read " << data.size() << " bytes\n";if (data.size() == 0) {break;}}CloseHandle(file_handle);
}int main(int argc, char* argv[]) {if (argc < 2) {std::cout << "Usage: sample file_path" << std::endl;return 1;}IocpScheduler scheduler;async_read_file(scheduler, argv[1]);scheduler.run();return 0;
}

先看编译:

image

Compile Explorer 中指定最新的 msvc 编译器和 C++20 选项可以编译通过,注意在 Windows 中选项指定的语法与 Unix 大相径庭,别弄错了。

一点一点降低版本尝试,发现能编译这段代码的最低版本是 msvc19.29,对应 vs16.11,如果你需要在本地安装测试环境的话,稳妥起见安装 msvc19.30、对应 vs17.0 也就是  VS2022 比较好,如果本地只有 VS2019,需要升级到第五个也就是最后一个发行版才可以。

image

接下来创建一个简单的控制台应用包含上面的源文件,需要配置一下 C++ 语言标准:

image

就可以编译生成可执行文件了,在同目录准备一个文本文件 (test.txt) 进行测试:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
...
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 409
PS D:\code\iocp_coroutine\Debug>

居然死循环了。

指定偏移量

同样的代码逻辑,Unix 上没问题 Windows 上却死循环,主要原因是:前者底层使用的是管道,与 socket 之类相似是一个流 (stream),因此没有读写偏移量的说法,每次从开头获取就可以了;后者使用的是文件,如果不指定偏移量,每次都会从位置 0 读取,有的读者可能问了,为何不能使用当前文件的读取位置呢?这是因为 Windows 上的多路复用底层是彻彻底底的异步架构,必需每次为 ReadFile 指定一个偏移量,而不能够使用当前文件的偏移量。

修复的方法很简单,为 ReadFile 的 overlapped 参数的 Offset & OffsetHigh 字段指定要读取数据的偏移量即可:

...struct AsyncReadAwaiter {IocpScheduler& sched;HANDLE file_handle;std::unique_ptr<char[]> buffer;DWORD buffer_size;

增加一个引用成员用来记录当前请求的偏移值,LARGE_INTEGER 可以理解为 uint64 的结构化表达

    LARGE_INTEGER &offset; OVERLAPPED overlapped;DWORD bytes_read;AsyncReadAwaiter(IocpScheduler& s, HANDLE file, LARGE_INTEGER &off, DWORD size)

在构造函数中初始化新成员,这个值需要从外部传入,读取成功后更新之,以便跨等待对象使用

        : sched(s), file_handle(file), buffer_size(size), offset(off), bytes_read(0) {buffer = std::make_unique<char[]>(size);ZeroMemory(&overlapped, sizeof(OVERLAPPED));}bool await_ready() const {return false;}void await_suspend(std::coroutine_handle<> h) {sched.register_io(file_handle, h);

每次请求前设置 overlapped 的偏移字段,并增加调试日志输出以便观察

        overlapped.Offset = offset.LowPart; overlapped.OffsetHigh = offset.HighPart; std::cout << "ReadFile from " << offset.QuadPart << std::endl;if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {DWORD error = GetLastError();if (error != ERROR_IO_PENDING) {std::stringstream ss;ss << "ReadFile failed, error " << error;throw std::runtime_error(ss.str());}}}std::string await_resume() {DWORD bytes_transferred = 0;if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {DWORD error = GetLastError();std::stringstream ss;ss << "GetOverlappedResult failed, error " << error;throw std::runtime_error(ss.str());}

读取成功后,递增相应的偏移量

        offset.QuadPart += bytes_transferred; return std::string(buffer.get(), bytes_transferred);}
};Task async_read_file(IocpScheduler& sched, const char* path) {HANDLE file_handle = CreateFileA(path,GENERIC_READ,FILE_SHARE_READ,NULL,OPEN_EXISTING,FILE_FLAG_OVERLAPPED,NULL);if (file_handle == INVALID_HANDLE_VALUE) {std::stringstream ss;ss << "CreateFile failed, error " << GetLastError();throw std::runtime_error(ss.str());}

在外层循环中保存这个偏移量,以便可以持久化使用,初始值为 0

    LARGE_INTEGER offset = { 0 }; while (true) {auto data = co_await AsyncReadAwaiter(sched, file_handle, offset, 4096);std::cout << "Read " << data.size() << " bytes\n";if (data.size() == 0) {break;}}CloseHandle(file_handle);
}
...

再次运行程序,可以输出读取的内容了:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552

但是额外的,也收到了一个崩溃提示:

image

处理文件 EOF

记得之前讲到协程体整个是包在编译的 try...catch 代码块中的,这里直接崩溃难道是 msvc 的异常处理没起作用?挂上调试器看看崩溃堆栈:

image

看起来是命中 promise 对象的 unhandle_exception,这里调用的 terminate 导致崩溃框弹出,而 unhandled_exception 恰恰是编译器捕获了 throw 抛出的异常,与直觉刚好相反。经过排查,唯一可能抛出异常的位置是这里:

    std::string await_resume() {DWORD bytes_transferred = 0;if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {DWORD error = GetLastError();std::stringstream ss;ss << "GetOverlappedResult failed, error " << error;

这里加打一行日志

            std::cerr << ss.str() << std::endl;throw std::runtime_error(ss.str());}offset.QuadPart += bytes_transferred; return std::string(buffer.get(), bytes_transferred);}

新的输出果然提示这里返回了错误:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552
GetOverlappedResult failed, error 38

错误码 38 对应的是 ERROR_HANDLE_EOF表示文件已到末尾,相比 epoll 管道不关心数据结尾的问题,IOCP 读文件还需要额外增加一些处理,另外在抛异常时,msvc 相比 clang 的显示不太友好,需要在抛出异常前补上 stderr 的打印,修复后的代码如下:

    std::string await_resume() {DWORD bytes_transferred = 0;if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {DWORD error = GetLastError();

判断错误类型,如果是文件 EOF,直接返回空数据,上层会进行判断,从而退出读取循环

            if (error != ERROR_HANDLE_EOF) {std::stringstream ss;ss << "GetOverlappedResult failed, error " << error;std::cerr << ss.str() << std::endl;throw std::runtime_error(ss.str());}else {return ""; }}offset.QuadPart += bytes_transferred; return std::string(buffer.get(), bytes_transferred);}

下面是新的输出:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552
Read 0 bytes

不再报错了。

多文件并行

上面的例子虽然通过多次读取展示了协程多次唤醒的过程,但没有展示多个 IO 句柄并发的能力,下面稍加改造,同时读取多个文件:

Task async_read_file(IocpScheduler& sched, const char* path) {HANDLE file_handle = CreateFileA(path,GENERIC_READ,FILE_SHARE_READ,NULL,OPEN_EXISTING,FILE_FLAG_OVERLAPPED,NULL);if (file_handle == INVALID_HANDLE_VALUE) {std::stringstream ss;ss << "CreateFile failed, error " << GetLastError();std::cerr << ss.str() << std::endl; throw std::runtime_error(ss.str());}LARGE_INTEGER offset = { 0 };while (true) {auto data = co_await AsyncReadAwaiter(sched, file_handle, offset, 4096);

输出文件句柄以区别从不同文件读取的数据

        std::cout << "Read [" << file_handle << "] " << data.size() << " bytes\n";if (data.size() == 0) {break;}}CloseHandle(file_handle);
}int main(int argc, char* argv[]) {if (argc < 3) {std::cout << "Usage: sample file1 file2" << std::endl;return 1;}IocpScheduler scheduler;async_read_file(scheduler, argv[1]);

多个文件只需要多次调用协程即可,从这里可以感受到协程强大的拓展性

    async_read_file(scheduler, argv[2]);scheduler.run();return 0;
}

下面是新的输出:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt test2.txt
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 456 bytes
Read [0000010C] 456 bytes
Read [00000108] 0 bytes
Read [0000010C] 0 bytes

为了便于对比,这里将读取 buffer 的默认尺寸改为 1024,并去掉了调试日志。可以看出在 IOCP 里两个文件基本是轮着读的,公平性还是不错的。

await_suspend & 试读

上文中提到,通过在 await_ready 或 await_suspend 中增加一些代码,就可以支持数据试读,从而在某些场景下提升数据吞吐能力。下面看看 IOCP 是如何实现的,这里只演示 await_suspend 方式:

    bool await_suspend(std::coroutine_handle<> h) {sched.register_io(file_handle, h);overlapped.Offset = offset.LowPart;overlapped.OffsetHigh = offset.HighPart;//std::cout << "ReadFile from " << offset.QuadPart << std::endl;if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {DWORD error = GetLastError();if (error != ERROR_IO_PENDING) {std::stringstream ss;ss << "ReadFile failed, error " << error;std::cerr << ss.str() << std::endl; throw std::runtime_error(ss.str());}}

ReadFile 本身具有试读能力,当任务可以立即完成时,它将返回 TRUE,bytes_read 参数将返回读取的数据长度;这里加入判断,若立即读取成功,则返回 false 不挂起协程

        else {// if immediately success, not hangupstd::cout << "immediately success, read = " << bytes_read << std::endl; }return bytes_read > 0 ? false : true;}std::string await_resume() {DWORD bytes_transferred = 0;

resume 时先判断是否为试读场景,是的话直接返回数据就可以了

        if (bytes_read > 0) {bytes_transferred = bytes_read;}else {if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {DWORD error = GetLastError();if (error != ERROR_HANDLE_EOF) {std::stringstream ss;ss << "GetOverlappedResult failed, error " << error;std::cerr << ss.str() << std::endl;throw std::runtime_error(ss.str());}else {return "";}}}offset.QuadPart += bytes_transferred; return std::string(buffer.get(), bytes_transferred);}

从这里也可以看出,Windows 对直接成功的支持是比较好的,不必像 Unix 那样来回倒腾数据,下面是新版本输出:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 456 bytes
Read [000000FC] 0 bytes

运行了多次没有试出来,可能 Windows 只对网络等真正异步的场景才会有立即成功的情况吧。

PostQueuedCompletionStatus & 完美退出

上面的 demo 如果遇到大文件目前只能通过 Ctrl C 强制杀死,毕竟调度器的 run 是个死循环没法退出,下面对它进行一番改造,看看能否实现完美退出

IocpScheduler g_scheduler;

由于需要在信号响应函数中通知调度器退出,这里将它做为一个全局变量,工程化一点的话可以改成单例,这里偷个懒

int main(int argc, char* argv[]) {if (argc < 2) {std::cout << "Usage: sample file" << std::endl;return 1;}

初始化时捕获 SiGINT 以便响应 Ctrl C

    signal(SIGINT, on_user_exit); async_read_file(g_scheduler, argv[1]);g_scheduler.run();return 0;
}

在信号响应函数中调用调度器退出接口实现完美退出

void on_user_exit(int signo) {g_scheduler.exit(signo); 
}class IocpScheduler {...

调度器中新增的退出接口,无脑给 IOCP 队列投递通知,注意 key 参数给的是 0,以区别于一般的文件读取事件

    void exit(int signo) {std::cout << "caught signal " << signo << ", prepare to quit!" << std::endl; PostQueuedCompletionStatus(iocp_handle, 0, (ULONG_PTR)0, NULL);}void run() {while (true) {DWORD bytes_transferred = 0;ULONG_PTR completion_key = 0;LPOVERLAPPED overlapped = nullptr;BOOL success = GetQueuedCompletionStatus(iocp_handle,&bytes_transferred,&completion_key,&overlapped,INFINITE);

收到事件后,优先检测是否为退出事件,命中的话直接 break while 循环退出 main

            if (completion_key == 0) {std::cout << "IOCP ready to quit" << std::endl; break; }else {HANDLE ready_handle = (HANDLE)completion_key;if (auto it = io_handles.find(ready_handle); it != io_handles.end()) {it->second.resume();}}}}~IocpScheduler() {

调度器析构中增加协程的销毁,防止内存、句柄泄漏

        for(auto handle : io_handles) {std::cout << "coroutine destroy" << std::endl;handle.second.destroy();}CloseHandle(iocp_handle);}
};

下面是新版输出:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 456 bytes
Read [00000110] 0 bytes
caught signal 2, prepare to quit!
IOCP ready to quit
coroutine destroy

用户按下 Ctrl C 后,可以实现完美退出啦! 

结语

本文介绍了一种基于 IOCP 多路复用的协程调度器,除此之外还说明了如何妥善处理文件偏移、文件 EOF、await_suspend 与试读写、PostQueuedCompletionStatus 与进程完美退出等,可用于构建工业级强度的代码。

最后,由于本文中 demo 经历多次迭代,想要复制最终版进行验证的小伙伴,可以 follow 这个开源 git 库获取:cpp20coroutine。

下一篇来看下,如何将现有的基于回调的异步库与 C++20 协程无缝糅合。

参考 

[1]. 如果异步完成,ReadFile()是否总是返回FALSE?

[2]. 系统错误代码 (0-499)

http://www.hskmm.com/?act=detail&tid=13152

相关文章:

  • Gitee PPM风险矩阵:数字化转型中的项目管理预警雷达
  • 同一个灰色,POI取出来却是白色:一次Excel颜色解析的踩坑记录
  • 坤驰科技携国产化MTCA解决方案,亮相大科学装置控制系统研讨会
  • 找出所有项目引用了哪些 NuGet 包、版本号、对应项目路径,并筛选出“同一个包名但版本不同”的情况。
  • PC与基恩士PLC通信的C#实现
  • Excel 表格技能
  • labelme标注后的json文件和原图同步按角度旋转
  • rk3588的ai功能和deepseek
  • EPSON L1300打印机清零教程
  • 「线性代数」矩阵运算与初等变换
  • 移动号码线上复机
  • Uni-App 使用android studio打包最新教程
  • tomcat CPU数量和线程数的关系
  • NASA运货飞船天鹅座再次推迟,航天任务为什么总是“彩排”不断
  • Centos系统切换为光盘本地源
  • 基于Hilbert-Huang变换(HHT)的模态分解与瞬时频率计算
  • NIO
  • python处理Excel单机小程序:匹数据,增强版VLookup
  • var sql 的不同用法
  • CF623B Array GCD
  • Python爬虫实现双色球历史数据抓取
  • 酵母细胞工厂全球调控策略研究进展:从遗传编辑到智能响应
  • Avalonia 根据绑定的数据类型动态选择模板
  • PyTorch图神经网络(一)
  • Python版Sigstore稳定版发布:软件供应链签名新标准
  • 网速带宽概念
  • 跨网传输软件:打通数据孤岛,保障安全流通!
  • 「KDOI-07」能量场
  • 基于LQR控制器的柔性机械臂抑振
  • 202507_QQ_caidundun