生产环境使用boost::fiber
简介
boost::fiber是一类用户级线程,也就是纤程。其提供的例子与实际生产环境相距较远,本文将对其进行一定的改造,将其能够投入到生产环境。
同时由于纤程是具有传染性的,使用纤程的代码里也全部要用纤程封装,本文将对一些组件进行简单封装。
fiber封装
boost::fiber支持设置pthread和fiber的比例是1:n还是m:n,同时也支持设置调度方式是随机调度还是抢占调度。
本文中选择使用抢占式调度,并且是m:n的比例,这种选择适用面更加广。
既然pthread和fiber比例是m:n,那么这个m一般等于逻辑核数量,也就是需要设置fiber调度的线程控制在大小为固定的线程池中。fiber中抢占式调度方式也要求固定的线程池数量,外部前程加入时,可能会影响抢占式调度,即不能在外部线程中调用fiber,不然这个线程就加入到了fiber调度的pthread中了。
这时,需要一个设置一个队列,外部线程往这个队列中添加任务;内部线程池从队列中取任务,同时触发fiber,在fiber中可以继续触发fiber。触发队列、内部队列、工作线程、外部线程的关系如下图所示:

运行逻辑被装箱到一个任务中,然后被添加到任务队列,这一步利用模板和上转型实现,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | class IFiberTask {public:IFiberTask() = default;virtual ~IFiberTask() = default;IFiberTask(const IFiberTask& rhs) = delete;IFiberTask& operator=(const IFiberTask& rhs) = delete;IFiberTask(IFiberTask&& other) = default;IFiberTask& operator=(IFiberTask&& other) = default;virtual void execute() = 0;public:inline static std::atomic_size_t fibers_size {0};
};template <typename Func>
class FiberTask: public IFiberTask {public:explicit FiberTask(Func&& func) :func_{std::move(func)} { }~FiberTask() override = default;FiberTask(const FiberTask& rhs) = delete;FiberTask& operator=(const FiberTask& rhs) = delete;FiberTask(FiberTask&& other) noexcept = default;FiberTask& operator=(FiberTask&& other) noexcept = default;void execute() override {fibers_size.fetch_add(1);func_();fibers_size.fetch_sub(1);}private:Func func_;
};
|
IFiberTask是任务基类,不可拷贝;FiberTask是模板类,成员变量func_存储算子。使用IFiberTask类指针指向特化后的FiberTask对象,这时就实现的装箱操作,调用execute时,实际调用了子类的execute,触发封装的func_对象。
外部队列基于boost::fibers::buffered_channel实现,这是一个支持并发的队列,队列的元素类型为std::tuple<boost::fibers::launch, std::unique_ptr>,其中tuple第一元素存储任务的触发形式,进入队列还是立即触发。
接着是任务装箱,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | template<typename Func, typename... Args>
auto Submit(boost::fibers::launch launch_policy, Func&& func, Args&&... args) {// 捕获lambda极其参数auto capture = [func = std::forward<Func>(func),args = std::make_tuple(std::forward<Args>(args)...)]() mutable {return std::apply(std::move(func), std::move(args));};// 任务的返回值类型using task_result_t = std::invoke_result_t<decltype(capture)>;// 该任务packaged_task的using packaged_task_t = boost::fibers::packaged_task<task_result_t()>;// 创建任务对象packaged_task_t task {std::move(capture)};// 装箱到FiberTask中using task_t = fiber::FiberTask<packaged_task_t>;// 获取packaged_task的futureauto result_future = task.get_future();// 添加到buffered_channel中auto status = work_queue_.push(std::make_tuple(launch_policy, std::make_unique<task_t>(std::move(task))));if (status != boost::fibers::channel_op_status::success) {return std::optional<std::decay_t<decltype(result_future)>> {};}return std::make_optional(std::move(result_future));
}
|
代码中,先捕获lambda表达式及其参数,获取返回值类型并添加到packaged_task中,然后装箱到FiberTask中,使用packaged_task获取future并返回,FiberTask对象添加到队列中,使用IFiberTask的指针指向这个对象,实现装箱操作。
接着是内部任务触发的逻辑,首先创建一个线程池,每个线程注册调度器,接着从队列中获取任务,触发fiber。
工作线程的执行函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | // 注册调度算法为抢占式调度
boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(threads_size_, true);
// 创建task类型
auto task_tuple = typename decltype(work_queue_)::value_type {};// 从队列中获取任务
while(boost::fibers::channel_op_status::success == work_queue_.pop(task_tuple)) {// 解包auto& [launch_policy, task_to_run] = task_tuple;// 触发 fiber并detachboost::fibers::fiber(launch_policy, [task = std::move(task_to_run)]() {task->execute();}).detach();
}
|
抢占式调度在注册时需要指定线程池大小,这时不能在外部线程中调用fiber,因为调用fiber的时候会把该线程添加到fiber调度的线程中,也就调整了fiber的worker线程数量。
以上代码实现了fiber触发器、任务队列、工作线程池等逻辑。
理论上可以创建多个fiber调度组件对象,每个组件根据自己的需要设置资源情况。
但实际应用中,还是建议使用一个全局调度组件,因为当A调度器中的任务依赖B调度器的任务的同时,就会出现阻塞工作线程,影响实际性能。
下面封装一个全局调度器,提供递交任务的接口和结束调度的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | class DefaultPool {private:static auto* Pool() {const static size_t size = std::thread::hardware_concurrency();static fiber::FiberPool pool(size, size*8);return &pool;}public:template<typename Func, typename... Args>static auto SubmitJob(boost::fibers::launch launch_policy, Func &&func, Args &&... args) {return Pool()->Submit(launch_policy, std::forward<Func>(func), std::forward<Args>(args)...);}template<typename Func, typename... Args>static auto SubmitJob(Func &&func, Args &&... args) {return Pool()->Submit(std::forward<Func>(func), std::forward<Args>(args)...);}static void Close() {Pool()->CloseQueue();}private:DefaultPool() = default;
};
|
其他组件封装
上面对boost::fiber进行封装,得到一个能投入生产环境的调度器。
但是仅仅是这些是不够的,毕竟对于生产环境中的服务而言,外部服务、中间件的依赖是不能少的。
纤程是具有传染性的,对于外部组件提供的sdk,发送请求并进行同步等待会阻塞纤程对应的工作线程,影响整套机制。
为此,需要对现有的组件进行封装,对于同步接口,需要使用线程池配合fiber::promise;对于异步接口,可以改造成fiber::promise、future机制。下面介绍几种常用组件的fiber封装。
redis客户端封装
同步接口加线程池的方式将同步接口改造成异步接口的方案,存在较大的安全隐患。
线程池的容量不可控,当流量突然增加时,需要大量线程去等待,从而耗尽线程池资源,造成任务大量积压,服务崩溃。
而扩大线程池数量,又消耗了大量的资源。
综上,对于fiber化封装,还是建议采用异步接口。hiredis库支持异步接口,redis_plus_plus库对hiredis进行了c++封装,同时也提供了异步接口,本节将面向这个接口进行改造。
redis提供了挺多的接口,这里只对del、get、set三个接口做个示范:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | template<typename Type>
using Promise = boost::fibers::promise<Type>;template<typename Type>
using Future = boost::fibers::future<Type>;Future<long long > Del(const StringView &key) {auto promise = std::make_unique<Promise<long long >>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.del(key, [promise =promise.release()](sw::redis::Future<long long > &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}Future<OptionalString> Get(const StringView &key) {auto promise = std::make_unique<Promise<OptionalString>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.get(key, [promise = promise.release()](sw::redis::Future<OptionalString> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}Future<bool> Set(const StringView &key, const StringView &val) {auto promise = std::make_unique<Promise<bool>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.set(key, val, [promise = promise.release()](sw::redis::Future<bool> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}
|
注意,redis_plus_plus对每个回调函数通过模板进行判断,因此无法使用mutable+移动捕获promise,只能使用指针赋值的方式实现。redis_plus_plus在1.3.6以后的版本才有回调函数机制,之前的版本不支持。
上面原理是,创建fiber的promise和future,然后让redis的回调函数中捕获promise,并在promise中对数据进行赋值。而外部使用fiber的future进行等待,并不会阻塞工作线程。
grpc客户端封装
跟上面的redis客户端类似,这里也建议对grpc的异步客户端进行改造,支持fiber的promise、future机制。
grpc的异步客户端需要牵扯到grpc::CompletionQueue,里面实现了一套poll engine,需要绑定一个线程去进行epoll_wait操作。首先定义一个GrpcClient类,包含四个成员变量、两个成员函数,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | class GrpcClient {public:explicit GrpcClient(const ClientOption& option);~GrpcClient();// 对外提供的接口Future<meta::HelloResponse> Call(const meta::HelloRequest& request);private:// worker线程执行的逻辑void Work();private:std::unique_ptr<grpc::CompletionQueue> completion_queue_;std::thread worker_;std::shared_ptr<grpc::Channel> channel_;gpr_timespec timespec_{};
};
|
异步客户端分为三个部分逻辑,第一个是请求发送(Call函数),第二个是io线程批量处理,第三个是外部等待Future。
为了能够让io线程里给Promise进行赋值,需要Call函数中将Promise及其相关上下文传递到io线程中,这里定义一个上下文结构体:
1 2 3 4 5 6 | struct CallData {grpc::ClientContext context; // grpc上下文Promise<meta::HelloResponse> promise; // Promise对象grpc::Status status; // grpc调用状态meta::HelloResponse response; // 相应包
};
|
Call函数中的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 | // 创建上下文对象 auto data = new CallData; // 设置超时时间 data->context.set_deadline(timespec_); // 创建桩 meta::HelloService::Stub stub(channel_); auto future = data->promise.get_future(); // 异步调用,添加到完成队列中 auto rpc = stub.Asynchello(&data->context, request, completion_queue_.get()); // 绑定response、status,并将上下文对象作为tag传下去 rpc->Finish(&data->response, &data->status, reinterpret_cast<void*>(data)); return future; |
data对象在该函数中创建,在Work函数中释放,不存在内存泄漏问题。
grpc的异步稍微有点麻烦,发送之后,还要绑定数据。
接着是Work线程中的逻辑了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | CallData* data = nullptr;
bool ok = false;
// 获取状态完毕的数据
while (completion_queue_->Next((void**)&data, &ok)) {// 判断队列是否已经结束if (!ok) {break;}// 如果grpc状态ok,则赋值if (data->status.ok()) {data->promise.set_value(std::move(data->response));} else {// 否则设置异常data->promise.set_exception(std::make_exception_ptr(std::runtime_error(data->status.error_message())));}// 删除数据delete data;data = nullptr;
}
|
调用完成队列的Next函数会阻塞,如果队列中存在状态达到最终状态的数据,则返回一条。从完成对于中取到的数据的顺序与入队顺序不同。
上面两个函数组合实现了Future获取和Promise赋值的操作,使得grpc客户端能在fiber中使用。
参考
相关文章:
生产环境使用boost::fiber
简介 boost::fiber是一类用户级线程,也就是纤程。其提供的例子与实际生产环境相距较远,本文将对其进行一定的改造,将其能够投入到生产环境。 同时由于纤程是具有传染性的,使用纤程的代码里也全部要用纤程封装,本文将对…...
TSINGSEE青犀AI视频识别技术+危化安全生产智慧监管方案
一、背景分析 石油与化学工业生产过程复杂多样,涉及的物料易燃易爆、有毒有害,生产条件多高温高压、低温负压,现场危险化学品存储量大、危险源集中,重特大安全事故多发。打造基于工业互联网的安全生产新型能力,提高危…...
小程序request请求封装
以上为本人的项目目录 1.首先在utils中创建request.js文件封装request请求,此封装带上了token,每次请求都会自带token,需要你从后端获取后利用wx.setStorageSync(token,返回的token),不使用的话就是空。 直接复制即可,需要改一下…...
Easy Javadoc插件的使用教程
目录 一、安装Easy Javadoc插件 二、配置注释模板 三、配置翻译 一、安装Easy Javadoc插件 在idea的File-Settings-Plugins中搜索Easy Javadoc插件,点击install进行安装,安装完成后需要restart IDE,重启后插件生效。 二、配置注释模板 …...
一篇文章让你弄懂Java中的方法
目录 1. 方法概念及使用 1.1 什么是方法(method) 1.2 方法定义 1.3 方法调用的执行过程 1.4 实参和形参的关系 1.5 没有返回值的方法 2. 方法重载 2.1 为什么需要方法重载 2.2 方法重载概念 2.3 方法签名 1. 方法概念及使用 1.1 什么是方法(method) 方法就是一…...
SAP MM学习笔记39 - MRP(资材所要量计划)
这一章开始,离开请求书,学点儿新知识啦。 MRP ( Material Requirement Planning ) - 资材所要量计划。 它的位置在下面的调达周期图上来看,就是右上角的 所要量决定那块儿。 1,MRP(资材所要量计划) 的概要 MRP 的主要目的就是 确…...
总线类设备驱动——IIC
目录 一、本章目标 二、IIC设备驱动 2.1 I2C协议简介 2.2 LinuxI2C驱动 2.3 I2C 设备驱动实例 一、本章目标 一条总线可以将多个设备连接在一起,提高了系统的可扩展性能。这个互联的系统通常由三部分组成:总线控制器、物理总线(一组信号线) 和设备。总线控制器…...
MES 的价值点之动态调度
随着数字化技术的发展,为制造企业的生产计划提供了更多的便利。但在实际生产管理过程中,企业的生产计划不管做的多么理想,还是可能会因诸多的扰动因素造成执行与计划差异,这时就需要通过一些动态调整方案去适应新的生产要求与环境…...
dfs序及相关例题
常用的三种dfs序 欧拉序 每经过一次该点记录一次的序列。 dfs序 记录入栈和出栈的序列。 dfn序 只记录入栈的序列。 dfs序 DFS 序列是指 DFS 调用过程中访问的节点编号的序列。 如何求dfs序?可以用以下代码来找dfs序。 vector<vector<int>> g(n…...
python入门实战:爬取图片到本地
简单记录一下爬取网站图片保存到本地指定目录过程,希望对刚入门的小伙伴有所帮助! 目标网站就是下图所示页面: 实现步骤: 1.爬取每页的图片地址集合 2.下载图片到本地 3. 获取指定页数的页面路径 以下是实现代码: import bs4 import requests import os # 下…...
day02 矩阵 2023.10.26
1.矩阵 2.矩阵乘法 3.特殊矩阵 4.逆矩阵 5.正交矩阵 6.几何意义 7.齐次坐标 8.平移矩阵 9.旋转矩阵 10.缩放矩阵 11.复合运算...
浪潮信息inMerge超融合 刷新全球vSAN架构虚拟化VMmark最佳成绩
近日,在国际权威的VMmark测试中,浪潮信息inMerge1100超融合产品搭载NF5280M7服务器,满载运行44Tiles取得40.95分的成绩,刷新了vSAN架构(Intel双路最新平台)虚拟化性能测试纪录。该测试结果证明inMerge1100可…...
【【哈希应用】位图/布隆过滤器】
位图/布隆过滤器 位图位图概念位图的使用位图模拟实现 布隆过滤器布隆过滤器概念布隆过滤器的使用布隆过滤器模拟实现 位图/布隆过滤器应用:海量数据处理哈希切分 位图 位图概念 计算机中通常以位bit为数据最小存储单位,只有0、1两种二进制状态&#x…...
OpenCV学习笔记
一、OpenCV基础 (一)图像的读取、显示、创建 https://mp.weixin.qq.com/s?__bizMzA4MTA1NjM5NQ&mid2247485202&idx1&sn05d0b4cd25675a99357910a5f2694508&chksm9f9b80f6a8ec09e03ab2bb518ea6aad83db007c9cdd602c7459ed75c737e380ac9c3…...
idea 一键部署jar包
上传成功...
16、SpringCloud -- 常见的接口防刷限流方式
目录 接口防刷限流方式1:隐藏秒杀地址需求:思路:代码:前端:后端:测试:总结:方式2:图形验证码1、生成图形验证码需求:思路:代码:前端:后端:测试:2、校验验证码需求:思路:代码:...
Typora(morkdown编辑器)的安装包和安装教程
Typora(morkdown编辑器)的安装包和安装教程 下载安装1、覆盖文件2、输入序列号①打开 typora ,点击“输入序列号”:②邮箱一栏中任意填写(但须保证邮箱地址格式正确),输入序列号,点击…...
服务器不稳定对网站有什么影响
世界上最远的距离,不是树枝无法相依,而是相互了望的星星,却没有交汇的轨迹。 现代技术的进步,导致了人与人之间距 离的消除,直播行业的快速发展的影响和渗透进如今的日常生活,为人们在遥远的距离相见与互诉…...
py实现surf特征提取
import cv2def main():# 加载图像image1 cv2.imread(image1.jpg, cv2.IMREAD_GRAYSCALE)image2 cv2.imread(image2.jpg, cv2.IMREAD_GRAYSCALE)# 创建SURF对象surf cv2.xfeatures2d.SURF_create()# 检测特征点和描述符keypoints1, descriptors1 surf.detectAndCompute(imag…...
MS39233三个半桥驱动器可兼容TMC6300
MS39233 是一款低压三个半桥驱动器。可兼容 TMC6300(功能基本一致,管脚不兼容)。它可应用于低电压及电池供电的运动控制场合。并且内置电荷泵来提供内部功率 NMOS 所需的栅驱动电压。 MS39233 可以提供最高 2.8A 的峰值电流,其功率…...
LSPosed-Irena框架深度解析:构建下一代Android Hook框架的完整指南
LSPosed-Irena框架深度解析:构建下一代Android Hook框架的完整指南 【免费下载链接】LSPosed-Irena Useless LSPosed Framework Fork 项目地址: https://gitcode.com/gh_mirrors/ls/LSPosed-Irena LSPosed-Irena是一个基于LSPlant的ART hooking框架ÿ…...
告别手动启动:教你写一个ROS2 Launch文件,一键运行robot_state_publisher和rviz2显示URDF
ROS2高效开发指南:用Launch文件一键启动机器人可视化系统 每次调试URDF模型都要重复输入一堆命令?手动启动robot_state_publisher、joint_state_publisher和rviz2节点不仅浪费时间,还容易遗漏参数。本文将带你深度掌握ROS2 Launch文件的编写…...
AI净界RMBG-1.4快速上手指南:小白也能轻松搞定透明素材
AI净界RMBG-1.4快速上手指南:小白也能轻松搞定透明素材 1. 为什么你需要这个工具 如果你曾经尝试过用传统软件抠图,一定遇到过这些烦恼:发丝边缘总是有残留背景色、半透明物体抠出来像蒙了一层雾、宠物毛发看起来像被啃过一样参差不齐。AI净…...
别再死记硬背TTS原理了!用Python+TensorFlow复现一个简易Deep Voice,从音素到语音全流程拆解
用PythonTensorFlow实战Deep Voice:从音素到语音的完整实现指南 当你第一次听到计算机生成的语音时,是否好奇过这背后的魔法是如何实现的?现代文本转语音(TTS)系统已经能够产生几乎与真人无异的语音,而Deep Voice作为早期端到端TT…...
PyArmor解包终极指南:3种高效逆向分析技巧快速掌握代码解密核心技术
PyArmor解包终极指南:3种高效逆向分析技巧快速掌握代码解密核心技术 【免费下载链接】PyArmor-Unpacker A deobfuscator for PyArmor. 项目地址: https://gitcode.com/gh_mirrors/py/PyArmor-Unpacker PyArmor-Unpacker是一个专为Python开发者和安全研究人员…...
纹理识别必备!5个高质量数据集下载与使用指南(附避坑技巧)
纹理识别实战指南:五大高价值数据集深度解析与应用技巧 纹理识别作为计算机视觉领域的重要分支,在工业质检、自动驾驶、医疗影像等场景中发挥着关键作用。但许多开发者在数据集获取和预处理阶段就会遇到各种"暗坑"——从下载链接失效到标注格式…...
攻克Godot资源提取难题:godot-unpacker工具的创新解法
攻克Godot资源提取难题:godot-unpacker工具的创新解法 【免费下载链接】godot-unpacker godot .pck unpacker 项目地址: https://gitcode.com/gh_mirrors/go/godot-unpacker 问题:为什么普通解压工具无法胜任PCK文件提取? Godot引擎打…...
SMT贴片机核心构造与PCB组装效率提升全解析
1. SMT贴片机核心构造解析 SMT贴片机作为电子制造产线的"心脏",其构造精密程度直接决定了PCB组装的效率和质量。现代贴片机就像一台高度智能化的机器人,由机械系统、电子控制系统和视觉系统三大部分组成。我拆解过不少机型,发现它们…...
避坑指南:GD32F407移植FATFS到SD卡,这几个STM32老司机常踩的坑你别再跳了
GD32F407 FATFS移植避坑实战:STM32老手最容易忽略的5个硬件差异 从STM32切换到GD32F407的开发者,往往带着"Pin to Pin兼容"的预期开始SD卡文件系统移植,却在调试阶段遭遇各种诡异问题。上周一位资深工程师向我展示了他的调试记录&a…...
DriverStore Explorer完全指南:免费Windows驱动管理终极教程
DriverStore Explorer完全指南:免费Windows驱动管理终极教程 【免费下载链接】DriverStoreExplorer Driver Store Explorer [RAPR] 项目地址: https://gitcode.com/gh_mirrors/dr/DriverStoreExplorer DriverStore Explorer是一款功能强大的Windows驱动程序管…...
