网络编程(17)——asio多线程模型IOThreadPool
十七、day17
之前我们介绍了IOServicePool的方式,一个IOServicePool开启n个线程和n个iocontext,每个线程内独立运行iocontext, 各个iocontext监听各自绑定的socket是否就绪,如果就绪就在各自线程里触发回调函数。为避免线程安全问题,我们将网络数据封装为逻辑包投递给逻辑系统,逻辑系统有一个单独线程处理,这样将网络IO和逻辑处理解耦合,极大的提高了服务器IO层面的吞吐率。
今天给大家介绍asio多线程模式的第二种多线程模式IOThreadPool,我们只初始化一个iocontext用来监听服务器的读写事件,包括新连接到来的监听也用这个iocontext。只是我们让iocontext.run在多个线程中调用,启动一个 io_context,由多个线程共享,这样回调函数就会被不同的线程触发,从这个角度看回调函数被并发调用了。
1. IOThreadPool实现
1) IOThreadPool.h
#pragma once
#include "Singleton.h"
#include <boost/asio.hpp>class AsioThreadPool : public Singleton<AsioThreadPool>
{
public:friend class Singleton<AsioThreadPool>;~AsioThreadPool() {}AsioThreadPool& operator = (const AsioThreadPool&) = delete;AsioThreadPool(const AsioThreadPool&) = delete;boost::asio::io_context& GetIOService();void Stop();
private:AsioThreadPool(int threadNum = std::thread::hardware_concurrency());boost::asio::io_context _service;std::unique_ptr<boost::asio::io_context::work> _work;std::vector<std::thread> _threads;
};
- AsioThreadPool也是单例模式,需继承单例模板类Singleton<T>
- _service:AsioThreadPool多线程模式中,只有一个io_context进行不同线程的调度,所以仅初始化一个ioc
- _work:work的数量与ioc相同,有且仅有一个
- _threads:线程数,默认与cpu核数相同
AsioThreadPool继承 Singleton 模板类时,Singleton 类的拷贝构造函数和赋值运算符已经被删除,这意味着AsioThreadPool也会继承这些删除操作。因此,派生类默认不能被拷贝或赋值,也不需要再次delte拷贝或者赋值,但是可以写上增加代码阅读性。
2)AsioThreadPool构造函数
AsioThreadPool::AsioThreadPool(int threadNum) : _work(new boost::asio::io_context::work(_service)){for (int i = 0; i < threadNum; ++i) {_threads.emplace_back([this]() {_service.run();});}
}
注意:work是通过std::unique_ptr进行管理的,所以work不能被拷贝,仅仅可以通过移动语义转移。
将AsioThreadPool池中唯一的ioc与work绑定,防止ioc返回,然后启动threadNum个线程,每个线程中都调用_service.run()。
_service.run函数内部就是从iocp或者epoll获取就绪描述符和绑定的回调函数,进而调用回调函数,因为回调函数是在不同的线程里调用的,所以会存在不同的线程调用同一个socket的回调函数的情况。
_service.run内部在Linux环境下调用的是epoll_wait返回所有就绪的描述符列表,在windows上会循环调用GetQueuedCompletionStatus函数返回就绪的描述符,二者原理类似,进而通过描述符找到对应的注册的回调函数,然后调用回调函数。
二者的流程如下:
a. IOCP
- IOCP的使用主要分为以下几步:
1 创建完成端口(iocp)对象
2 创建一个或多个工作线程,在完成端口上执行并处理投递到完成端口上的I/O请求
3 Socket关联iocp对象,在Socket上投递网络事件
4 工作线程调用GetQueuedCompletionStatus函数获取完成通知封包,取得事件信息并进行处理。该函数会阻塞,直到有I/O操作完成并返回相应的事件信息,线程接收到通知后就可以处理对应的I/O请求。
b. epoll
- 调用epoll_creat在内核中创建一张epoll表,这张 epoll 表用于管理要监视的文件描述符
- 开辟一片包含n个epoll_event大小的连续空间,这个空间的大小是预期要监听的事件数量 n,以便存放准备好的事件信息。
- 将要监听的socket(或其他文件描述符)注册到epoll表里,可以指定需要监听的事件类型(如可读、可写等),并将其与之前创建的 epoll 实例关联。
- 调用epoll_wait,传入之前我们开辟的连续空间,这一调用会阻塞,直到有事件发生。一旦有事件就绪,epoll_wait 会返回就绪的 epoll_event 列表,并将对应的 Socket 信息写入之前开辟的内存空间中,供后续处理。
3)其他函数
boost::asio::io_context& AsioThreadPool::GetIOService() {return _service;
}void AsioThreadPool::Stop() {_work.reset();for (auto& t : _threads) {t.join(); }
}
- GetIOService()用于返回多线程池中唯一的上下文服务
- Stop()用于将work释放,以便ioc可以返回
2. 隐患及如何解决
IOThreadPool模式有一个隐患,同一个socket的就绪后,触发的回调函数可能在不同的线程里,比如第一次是在线程1,第二次是在线程3,如果这两次触发间隔时间不大,那么很可能出现不同线程并发访问数据的情况,比如在处理读事件时,第一次回调触发后我们从socket的接收缓冲区读数据出来,第二次回调触发,还是从socket的接收缓冲区读数据,就会造成两个线程同时从socket中读数据的情况,会造成数据混乱。因为多个线程都会执行io_context.run(),每次调用都会返回一些就绪的回调函数,比如,线程1调用io_context.run()返回ReadHandler1以及绑定的socket,线程2调用io_context.run()返回ReadHandler2以及绑定的socket,线程3调用io_context.run()返回ReadHandler3以及绑定的socket,但是ReadHandler1和ReadHandler3都是session1调用的,那么可能会导致多个线程同时处理相同的回调,导致线程安全问题
注:但如果回调函数被派发到逻辑队列中或者进行加锁,那么就不会存在不同线程访问同一个socket数据的线程安全问题。这里的隐患是不通过队列或者加锁,处理逻辑问题会存在线程安全问题。
1)通过strand改进
在多线程环境中触发回调函数时,我们可以使用 asio 提供的串行类 strand 来进行封装,从而实现串行调用。其基本原理是:每个线程在调用函数时,不直接执行该函数,而是将要调用的函数投递到由 strand 管理的队列中;随后,由一个统一的线程从队列中取出并调用这些回调函数。通过这种方式,函数调用是串行进行的,从而解决由于线程并发带来的安全问题。
使用strand进行封装
图中当socket就绪后并不是由多个线程调用每个socket注册的回调函数,而是将回调函数投递给strand管理的队列,再由strand统一调度派发。为了让回调函数被派发到strand的队列,我们只需要在注册回调函数时加一层strand的包装即可。
strand 实际上是将回调函数放入同一个队列(如果有逻辑层或者通过加锁处理回调,那么就不需要strand),以确保线程安全。因此,在描述符就绪后,相应的回调会被放入该队列中进行处理。虽然使用 strand 可以保证在多线程环境下的安全性,但在触发时仍然是单线程的,这限制了整体性能。因此,尽管多线程可以提高读写事件的派发效率,但整体性能通常不如IOService_pool 这种方式更优。在实际工作中,我们通常选择使用 IOService_pool,因为它能够更好地平衡并发和性能。
a. CSession类中新增成员变量_strand
boost::asio::strand<boost::asio::io_context::executor_type> _strand;
b. 修改CSession的构造函数
CSession::CSession(boost::asio::io_context& io_context, CServer* server):_socket(io_context), _server(server), _b_close(false),_b_head_parse(false), _strand(io_context.get_executor()){boost::uuids::uuid a_uuid = boost::uuids::random_generator()();_uuid = boost::uuids::to_string(a_uuid);_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}
可以看到_strand的初始化是放在初始化列表里,利用io_context.get_executor()返回的执行器构造strand。因为在asio中无论iocontext还是strand,底层都是通过executor调度的,我们将他理解为调度器就可以。如果多个iocontext和strand的调度器是一个,那他们的消息派发统一由这个调度器执行。我们利用iocontext的调度器构造strand,这样他们统一由一个调度器管理。在绑定回调函数的调度器时,我们选择strand绑定即可。
比如我们在Start函数里添加绑定 ,将回调函数的调用者绑定为_strand
void CSession::Start(){::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),boost::asio::bind_executor(_strand, std::bind(&CSession::HandleRead, this,std::placeholders::_1, std::placeholders::_2, SharedSelf())));
}
修改前的代码为:
void CSession::Start() {memset(_data, 0, MAX_LENGTH); // 缓冲区清零// 从套接字中读取数据,并绑定回调函数headle_read_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2,shared_from_this()));//_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),// boost::asio::bind_executor(_strand, // std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2,shared_from_this())));
}
同样的道理,在所有收发的地方,都将调度器绑定为_strand, 比如发送部分我们需要修改为如下
auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), boost::asio::bind_executor(_strand, std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf())));
3. 客户端
int main()
{try {auto pool = AsioThreadPool::GetInstance();boost::asio::io_context ioc;boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);// 必须异步等待,否则建立线程进行处理signals.async_wait([&ioc, pool](const boost::system::error_code& error, int signal_number) {if (!error) {std::cout << "Signal " << signal_number << " received." << std::endl;ioc.stop(); // 停止 io_contextpool->Stop();std::unique_lock<std::mutex> lock(mutex_quit);bstop = true;cond_quit.notify_one();}});CServer s(pool->GetIOService(), 10086);ioc.run();{std::unique_lock<std::mutex> lock(mutex_quit);while (!bstop) {cond_quit.wait(lock);}}}catch (std::exception& e) {std::cerr << "Exception: " << e.what() << '\n';}boost::asio::io_context io_context;
}
之所以用条件变量是因为传入Server的ioc是从线程池中获得的,Server初始化之后并没有调用ioc.run(),线程池中的ioc是运行在自己的线程中的,下面这段是运行在主线程中的,是为了防止主线程结束了而子线程还在运行,所以必须设置条件变量防止主线程结束
{std::unique_lock<std::mutex> lock(mutex_quit);while (!bstop) {cond_quit.wait(lock);}}
如果不设置条件变量使主线程挂起,那么当主线程结束后,如果server还没有跑起来,server相当于子线程,那么会直接让子线程结束
CServer s(pool->GetIOService(), 10086);
单独生成一个ioc运行监听信号,当收到两个退出信号时,服务器退出,而server的ioc是从线程池中获取的。当获取到退出信号时,首先将监听信号的ioc退出,然后将线程池的unique_ptr释放work,使得线程池中的ioc能退出,最后上锁然后唤醒主线程,主线程结束。
std::cout << "Signal " << signal_number << " received." << std::endl;ioc.stop(); // 停止 io_contextpool->Stop();std::unique_lock<std::mutex> lock(mutex_quit);bstop = true;cond_quit.notify_one();
4. 总结
1. 在IOServicePool示例中,使用main函数中定义的io_context来监听异常信号signal,并且用于初始化Server来监听接收信号;但在IOThreadPool示例中,main函数定义的ioc仅仅用于监听异常信号,而初始化Server启动异步接收async_accept是通过线程池中的ioc。
这是因为
第一种方式调用了n+1个ioc.run(),第二种方式调用了2个ioc.run()
- 前者在main函数中处理信号监听和Server任务,当main中的ioc接收到客户端连接之后,生成一个子session,并从IOServicePool线程池中取一个ioc用于管理子Session的收发。也就是说,main中的ioc用于监听信号和server任务处理,而线程池中的ioc用于处理子Session的收发。main中的ioc一旦停止,那么服务器就会停止客户端的连接服务,session任务不会在增加,而线程池中的ioc全部停止后,线程的收发就会停止。
- 后者再main函数中处理监听信号任务,而server任务处理交给了线程池IOThreadPool中唯一的ioc进行处理。也就是说,让一个ioc单独监听信号,另一个ioc运行在多线程中做收发。main中的ioc停止后,仅仅只是服务器的信号监听停止了,而server的任务处理以及session的收发仍然会持续。
此外,他们的退出机制也有些不同。
- 第一种方式:信号处理函数在
ioc
上注册,当收到信号时,会调用ioc.stop()
停止io_context
,并调用线程池的Stop
方法停止线程池的运行。这种方式不需要额外的同步机制,因为主线程就是ioc.run()
的执行线程,一旦停止,就会退出。 - 第二种方式:信号处理函数在主线程的
ioc
上注册,但实际处理的 I/O 事件由线程池中的io_context
完成。为了协调多线程退出,需要使用mutex
和条件变量(cond_quit
)来确保所有线程安全地退出。信号处理函数在停止io_context
后,会通知等待在条件变量上的线程退出,从而确保整个程序能够正确结束。
2. 哪一种多线程模式的效率最高?
直观上来说可能二者的效率都差不多?其实IOServicePool的效率略高于IOThreadPool。
IOThreadPool 通常会涉及到 strand,strand 的作用是将回调函数放入同一个队列中执行,以确保线程安全(如果逻辑层或者通过加锁已经处理了回调的同步问题,那么就不需要使用 strand)。在这种机制下,当描述符就绪时,相应的回调会被放入这个队列中依次处理。虽然 strand 可以确保多线程环境下的安全性,但由于回调函数的执行是串行的,所以在事件触发时仍然是单线程操作,这在一定程度上限制了整体性能的提升。
尽管多线程模式确实能提高读写事件的派发效率,但由于 strand 的串行限制,整体性能通常不如使用 IOService_pool 的方式。在实际工作中,我们更倾向于使用 IOService_pool,因为它在保证线程安全的同时,能够更好地利用并发性,提升程序性能和效率。
相关文章:

网络编程(17)——asio多线程模型IOThreadPool
十七、day17 之前我们介绍了IOServicePool的方式,一个IOServicePool开启n个线程和n个iocontext,每个线程内独立运行iocontext, 各个iocontext监听各自绑定的socket是否就绪,如果就绪就在各自线程里触发回调函数。为避免线程安全问题…...

【rust/egui/android】在android中使用egui库
文章目录 说在前面AndroidStudio安装编译安装运行问题 说在前面 操作系统:windows11java版本:23android sdk版本:35android ndk版本:22rust版本: AndroidStudio安装 安装AndroidStudio是为了安装sdk、ndk,…...

Git---Git打标签
打标签 像其他版本控制系统(VCS)一样,Git 可以给仓库历史中的某一个提交打上标签,以示重要。 比较有代表性的是人们会使用这个功能来标记发布结点( v1.0 、 v2.0 等等)。 在本节中,你将会学习如…...
深入理解Transformer的笔记记录(精简版本)---- Transformer
自注意力机制开启大规模预训练时代 1 从机器翻译模型举例 1.1把编码器和解码器联合起来看待的话,则整个流程就是(如下图从左至右所示): 1.首先,从编码器输入的句子会先经过一个自注意力层(即self-attention),它会帮助编码器在对每个单词编码时关注输入句子中的的其他单…...

Ubuntu 更换内核版本
更换内核脚本 这里以更换 5.15.0-88-generic 版本内核为例 cat kernel.sh#!/bin/bashapt install linux-image-5.15.0-88-generic # Ubuntu内核切换脚本# 检查是否具有root权限 if [[ $(id -u) -ne 0 ]]; thenecho "请以root身份运行此脚本。"exit 1 fi# 检查系统是…...

博士找高校教职避坑指南:史上最全的避坑秘籍
在学术的海洋中遨游多年,博士们终于要踏上寻找高校教职的征程。这不仅是职业生涯的新起点,更是一场充满未知与挑战的冒险。今天,就让我们来聊聊那些在寻找高校教职时需要避开的坑,希望能为你的求职之路保驾护航。 1. 薪资结构&am…...

Study-Oracle-11-ORALCE19C-ADG集群搭建
一路走来,所有遇到的人,帮助过我的、伤害过我的都是朋友,没有一个是敌人。 一、ORACLE--ADG VS ORACLE--DG的区别 1、DG是Oracle数据库的一种灾难恢复和数据保护解决方案,它通过在主数据库和一个或多个备用数据库之间实时复制数据,提供了数据的冗余备份和故障切换功能。…...

【C++】map详解(键值对的概念,与multimap的不同)
目录 00.引言 set 和 map 的区别 键值对的概念 01.map容器 主要特性 常用操作 主要用途 02.multimap容器 特性 常用操作 用途 00.引言 set 和 map 的区别 set 和 map 都是C标准模板库(STL)中的容器,它们的区别如下:…...

私域电商新纪元:消费增值模式引领百万业绩飞跃
各位朋友,我是吴军,专注于带领大家深入探索私域电商领域的非凡魅力与潜在机会。 今天,我想与大家分享一个鼓舞人心的真实故事。在短短的一个月内,我们的合作伙伴实现了业绩的飞跃,突破百万大关,并且用户活跃…...

AAA Mysql与redis的主从复制原理
一 :Mysql主从复制 重要的两个日志文件:bin log 和 relay log bin log:二进制日志(binnary log)以事件形式记录了对MySQL数据库执行更改的所有操作。 relay log:用来保存从节点I/O线程接受的bin log日志…...

结合大语言模型的机械臂抓取操作学习
一、 大语言模型的机械臂抓取操作关键步骤 介绍如何基于大语言模型实现机械臂在PyBullet环境中的抓取操作,涵盖机器人运动学、坐标系转换、抓取候选位姿生成、开放词汇检测以及大语言模型代码生成等模块。 1. 机器人正逆运动学基本概念 正运动学: 已知机器人的关节…...

数据结构-二叉树_堆
一. 树的概念 树在我们的日常生活中随处可见,人们将生活中的树转换成存放数据的树形结构,就成了数据结构中的“树”。 如上图所示,自然界中的树有树根,有树枝,有树叶,当我们将其转换成树形结构时…...

Vscode+Pycharm+Vue.js+WEUI+django火锅(三)理解Vue
新创建的Vue项目里面很多文件,对于新手,老老实实做一下了解。 1.框架逻辑 框架的逻辑都是相通的,花点时间理一下就清晰了。 2.文件目录及文件 创建好的vue项目下,主要的文件和文件夹要先认识一下,并与框架逻辑对应起…...

溯变:守护天使 | OPENAIGC开发者大赛企业组优秀作品
在第二届拯救者杯OPENAIGC开发者大赛中,涌现出一批技术突出、创意卓越的作品。为了让这些优秀项目被更多人看到,我们特意开设了优秀作品报道专栏,旨在展示其独特之处和开发者的精彩故事。 无论您是技术专家还是爱好者,希望能带给…...

android中byte[] buf没有结束符,new String(buf)会不会出错?
答案是:不会 看例子: 这和c是不一样的,不需要特别的在字符串后面添加一个\0结束....

鸿蒙harmonyos next flutter混合开发之开发plugin(获取操作系统版本号)
创建Plugin为my_plugin flutter create --org com.example --templateplugin --platformsandroid,ios,ohos my_plugin 创建Application为my_application flutter create --org com.example my_application flutter_application引用flutter_plugin,在pubspec.yam…...

介绍一款开源的 Modern GUI PySide6 / PyQt6的使用
首先附上大神的开源地址(自行克隆吧): https://github.com/Wanderson-Magalhaes/Modern_GUI_PyDracula_PySide6_or_PyQt6 步骤一:安装PySide6库 pip install PySide6 步骤二:运行main文件 python main.py 就得…...

【大模型】AI数据基础设施的对象存储
官网地址: MinIO | S3 Compatible Storage for AI Github地址: https://github.com/minio/minio 企业级,并对AI准备就绪的分布式对象存储(一般拿来存模型文件) 部署步骤参考: minio安装部署及…...

【前端工程解耦】使用事件中心实现系统解耦,注册,触发,删除事件
前言 事件中心提供了一种灵活且可扩展的方式来管理事件和处理函数之间的关系,同时保持它们之间的解耦,可以降低系统耦合度,将视图和逻辑拆分出来,还是那句话,如果一个中间件解决不了问题,那就再加一个 废话…...

计算机网络803-(4)网络层
目录 1.虚电路服务 虚电路是逻辑连接 2.数据报服务 3.虚电路服务与数据报服务的对比 二.虚拟互连网络-IP网 1.网络通信问题 2.中间设备 3.网络互连使用路由器 三.分类的 IP 地址 1. IP 地址及其表示方法 2.IP 地址的编址方法 3.分类 IP 地址 (1&#x…...

java速成指南
密码都是 123 适用于php .net 7天转java 【腾讯文档】快速上手培训-阿龙 分享给你多个文件 https://docs.qq.com/s/jUcRQ4VPA4grzx8SPYzrBa 第一节 安装jdk,maven,idea_哔哩哔哩_bilibili...

【Unity】双摄像机叠加渲染
一、前言 之前我在做我的一个Unity项目的时候,需要绘制场景网格的功能,于是就用到了UnityEngine.GL这个图形库来绘制,然后我发现绘制的网格线是渲染在UI之后的,也就是说绘制出来的图形会遮盖在UI上面,也就导致一旦这些…...

web网页项目--用户登录,注册页面代码
index.html <!DOCTYPE html> <html lang"zxx"><head><title>xxx注册</title><!-- Meta tags --><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0&q…...

国外火出圈儿的PM御用AI编程工具Bolt.new效果干不过国产的CodeFlying?号称全新定义全栈开发流程?
不知道大家最近有没有发现国外的很多AI都在挤破脑袋想去提升大模型的编程能力, 离我们最近的是上周Openai 发布的全新模型GPT-4o-Canvas, 拥有超强的代码编写能力。 另外还有LlamaCoder、Cursor、Claude artifacts、Replit... 光是今年一年就推出了好…...

爸妈总说着学门技术,学机器视觉技术确实是一条踏实的生活道路,这条路你走得下去走得通吗?
你爸妈说的对,有一技之长终身受益,人要有一技傍身。学一门技术是稳定职业与生活的基本的保障,但是与其盲目的选择一门技术,都是成年人,不如思考下这门技术给自我带来经济效益,在这一方面可以详细咨询我。 …...

2024互联网下载神器IDM6.42你值得拥有
🔥 互联网下载神器大揭秘!IDM6.42你值得拥有 🚀 Hey,各位小伙伴们,今天我要给你们安利一款我超爱的软件——Internet Download Manager 6.42(简称IDM),这款下载器简直就是下载界的“…...

基于H3C环境的实验——OSPF
目录 实验设备和环境 实验设备 实验环境 实验记录 1、单区域 OSPF基本配置 步骤1:搭建实验环境并完成基本配置 步骤2:检查网络连通性和路由器路由表。 步骤3:配置OSPF 步骤4:检查路由器OSPF邻居状态及路由表 实验设备和环境 实验设备 三台路由器、两台PC、电源线、两…...

java线程池详解
在Java中,线程池是一种重要的多线程处理方式,通过管理和复用线程,提高应用程序的性能和响应速度,减少线程创建和销毁的开销,避免线程数量过多导致系统负载过高的问题。本文将详细介绍Java线程池的概念、核心参数、工作…...

五、创建型(建造者模式)
建造者模式 概念 建造者模式是一种创建型设计模式,通过使用多个简单的对象一步步构建一个复杂的对象。它将一个复杂对象的构建过程与其表示分离,从而使同样的构建过程可以创建不同的表示。 应用场景 复杂对象构建:当一个对象有多个属性&…...

CPU超线程技术是什么,怎么启用超线程技术
超线程技术是一种允许单个物理CPU核心模拟成两个逻辑核心的技术,从而提升处理器的并行性能和效率。以下是对超线程技术的详细介绍: 基本概念:超线程(Hyper-Threading,HT)是Intel公司研发的一种技术&#x…...