生产环境使用boost::fiber
简介
boost::fiber是一类用户级线程,也就是纤程。其提供的例子与实际生产环境相距较远,本文将对其进行一定的改造,将其能够投入到生产环境。
同时由于纤程是具有传染性的,使用纤程的代码里也全部要用纤程封装,本文将对一些组件进行简单封装。
fiber封装
boost::fiber支持设置pthread和fiber的比例是1:n还是m:n,同时也支持设置调度方式是随机调度还是抢占调度。
本文中选择使用抢占式调度,并且是m:n的比例,这种选择适用面更加广。
既然pthread和fiber比例是m:n,那么这个m一般等于逻辑核数量,也就是需要设置fiber调度的线程控制在大小为固定的线程池中。fiber中抢占式调度方式也要求固定的线程池数量,外部前程加入时,可能会影响抢占式调度,即不能在外部线程中调用fiber,不然这个线程就加入到了fiber调度的pthread中了。
这时,需要一个设置一个队列,外部线程往这个队列中添加任务;内部线程池从队列中取任务,同时触发fiber,在fiber中可以继续触发fiber。触发队列、内部队列、工作线程、外部线程的关系如下图所示:

运行逻辑被装箱到一个任务中,然后被添加到任务队列,这一步利用模板和上转型实现,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | class IFiberTask {public:IFiberTask() = default;virtual ~IFiberTask() = default;IFiberTask(const IFiberTask& rhs) = delete;IFiberTask& operator=(const IFiberTask& rhs) = delete;IFiberTask(IFiberTask&& other) = default;IFiberTask& operator=(IFiberTask&& other) = default;virtual void execute() = 0;public:inline static std::atomic_size_t fibers_size {0};
};template <typename Func>
class FiberTask: public IFiberTask {public:explicit FiberTask(Func&& func) :func_{std::move(func)} { }~FiberTask() override = default;FiberTask(const FiberTask& rhs) = delete;FiberTask& operator=(const FiberTask& rhs) = delete;FiberTask(FiberTask&& other) noexcept = default;FiberTask& operator=(FiberTask&& other) noexcept = default;void execute() override {fibers_size.fetch_add(1);func_();fibers_size.fetch_sub(1);}private:Func func_;
};
|
IFiberTask是任务基类,不可拷贝;FiberTask是模板类,成员变量func_存储算子。使用IFiberTask类指针指向特化后的FiberTask对象,这时就实现的装箱操作,调用execute时,实际调用了子类的execute,触发封装的func_对象。
外部队列基于boost::fibers::buffered_channel实现,这是一个支持并发的队列,队列的元素类型为std::tuple<boost::fibers::launch, std::unique_ptr>,其中tuple第一元素存储任务的触发形式,进入队列还是立即触发。
接着是任务装箱,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | template<typename Func, typename... Args>
auto Submit(boost::fibers::launch launch_policy, Func&& func, Args&&... args) {// 捕获lambda极其参数auto capture = [func = std::forward<Func>(func),args = std::make_tuple(std::forward<Args>(args)...)]() mutable {return std::apply(std::move(func), std::move(args));};// 任务的返回值类型using task_result_t = std::invoke_result_t<decltype(capture)>;// 该任务packaged_task的using packaged_task_t = boost::fibers::packaged_task<task_result_t()>;// 创建任务对象packaged_task_t task {std::move(capture)};// 装箱到FiberTask中using task_t = fiber::FiberTask<packaged_task_t>;// 获取packaged_task的futureauto result_future = task.get_future();// 添加到buffered_channel中auto status = work_queue_.push(std::make_tuple(launch_policy, std::make_unique<task_t>(std::move(task))));if (status != boost::fibers::channel_op_status::success) {return std::optional<std::decay_t<decltype(result_future)>> {};}return std::make_optional(std::move(result_future));
}
|
代码中,先捕获lambda表达式及其参数,获取返回值类型并添加到packaged_task中,然后装箱到FiberTask中,使用packaged_task获取future并返回,FiberTask对象添加到队列中,使用IFiberTask的指针指向这个对象,实现装箱操作。
接着是内部任务触发的逻辑,首先创建一个线程池,每个线程注册调度器,接着从队列中获取任务,触发fiber。
工作线程的执行函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | // 注册调度算法为抢占式调度
boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(threads_size_, true);
// 创建task类型
auto task_tuple = typename decltype(work_queue_)::value_type {};// 从队列中获取任务
while(boost::fibers::channel_op_status::success == work_queue_.pop(task_tuple)) {// 解包auto& [launch_policy, task_to_run] = task_tuple;// 触发 fiber并detachboost::fibers::fiber(launch_policy, [task = std::move(task_to_run)]() {task->execute();}).detach();
}
|
抢占式调度在注册时需要指定线程池大小,这时不能在外部线程中调用fiber,因为调用fiber的时候会把该线程添加到fiber调度的线程中,也就调整了fiber的worker线程数量。
以上代码实现了fiber触发器、任务队列、工作线程池等逻辑。
理论上可以创建多个fiber调度组件对象,每个组件根据自己的需要设置资源情况。
但实际应用中,还是建议使用一个全局调度组件,因为当A调度器中的任务依赖B调度器的任务的同时,就会出现阻塞工作线程,影响实际性能。
下面封装一个全局调度器,提供递交任务的接口和结束调度的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | class DefaultPool {private:static auto* Pool() {const static size_t size = std::thread::hardware_concurrency();static fiber::FiberPool pool(size, size*8);return &pool;}public:template<typename Func, typename... Args>static auto SubmitJob(boost::fibers::launch launch_policy, Func &&func, Args &&... args) {return Pool()->Submit(launch_policy, std::forward<Func>(func), std::forward<Args>(args)...);}template<typename Func, typename... Args>static auto SubmitJob(Func &&func, Args &&... args) {return Pool()->Submit(std::forward<Func>(func), std::forward<Args>(args)...);}static void Close() {Pool()->CloseQueue();}private:DefaultPool() = default;
};
|
其他组件封装
上面对boost::fiber进行封装,得到一个能投入生产环境的调度器。
但是仅仅是这些是不够的,毕竟对于生产环境中的服务而言,外部服务、中间件的依赖是不能少的。
纤程是具有传染性的,对于外部组件提供的sdk,发送请求并进行同步等待会阻塞纤程对应的工作线程,影响整套机制。
为此,需要对现有的组件进行封装,对于同步接口,需要使用线程池配合fiber::promise;对于异步接口,可以改造成fiber::promise、future机制。下面介绍几种常用组件的fiber封装。
redis客户端封装
同步接口加线程池的方式将同步接口改造成异步接口的方案,存在较大的安全隐患。
线程池的容量不可控,当流量突然增加时,需要大量线程去等待,从而耗尽线程池资源,造成任务大量积压,服务崩溃。
而扩大线程池数量,又消耗了大量的资源。
综上,对于fiber化封装,还是建议采用异步接口。hiredis库支持异步接口,redis_plus_plus库对hiredis进行了c++封装,同时也提供了异步接口,本节将面向这个接口进行改造。
redis提供了挺多的接口,这里只对del、get、set三个接口做个示范:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | template<typename Type>
using Promise = boost::fibers::promise<Type>;template<typename Type>
using Future = boost::fibers::future<Type>;Future<long long > Del(const StringView &key) {auto promise = std::make_unique<Promise<long long >>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.del(key, [promise =promise.release()](sw::redis::Future<long long > &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}Future<OptionalString> Get(const StringView &key) {auto promise = std::make_unique<Promise<OptionalString>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.get(key, [promise = promise.release()](sw::redis::Future<OptionalString> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}Future<bool> Set(const StringView &key, const StringView &val) {auto promise = std::make_unique<Promise<bool>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.set(key, val, [promise = promise.release()](sw::redis::Future<bool> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}
|
注意,redis_plus_plus对每个回调函数通过模板进行判断,因此无法使用mutable+移动捕获promise,只能使用指针赋值的方式实现。redis_plus_plus在1.3.6以后的版本才有回调函数机制,之前的版本不支持。
上面原理是,创建fiber的promise和future,然后让redis的回调函数中捕获promise,并在promise中对数据进行赋值。而外部使用fiber的future进行等待,并不会阻塞工作线程。
grpc客户端封装
跟上面的redis客户端类似,这里也建议对grpc的异步客户端进行改造,支持fiber的promise、future机制。
grpc的异步客户端需要牵扯到grpc::CompletionQueue,里面实现了一套poll engine,需要绑定一个线程去进行epoll_wait操作。首先定义一个GrpcClient类,包含四个成员变量、两个成员函数,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | class GrpcClient {public:explicit GrpcClient(const ClientOption& option);~GrpcClient();// 对外提供的接口Future<meta::HelloResponse> Call(const meta::HelloRequest& request);private:// worker线程执行的逻辑void Work();private:std::unique_ptr<grpc::CompletionQueue> completion_queue_;std::thread worker_;std::shared_ptr<grpc::Channel> channel_;gpr_timespec timespec_{};
};
|
异步客户端分为三个部分逻辑,第一个是请求发送(Call函数),第二个是io线程批量处理,第三个是外部等待Future。
为了能够让io线程里给Promise进行赋值,需要Call函数中将Promise及其相关上下文传递到io线程中,这里定义一个上下文结构体:
1 2 3 4 5 6 | struct CallData {grpc::ClientContext context; // grpc上下文Promise<meta::HelloResponse> promise; // Promise对象grpc::Status status; // grpc调用状态meta::HelloResponse response; // 相应包
};
|
Call函数中的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 | // 创建上下文对象 auto data = new CallData; // 设置超时时间 data->context.set_deadline(timespec_); // 创建桩 meta::HelloService::Stub stub(channel_); auto future = data->promise.get_future(); // 异步调用,添加到完成队列中 auto rpc = stub.Asynchello(&data->context, request, completion_queue_.get()); // 绑定response、status,并将上下文对象作为tag传下去 rpc->Finish(&data->response, &data->status, reinterpret_cast<void*>(data)); return future; |
data对象在该函数中创建,在Work函数中释放,不存在内存泄漏问题。
grpc的异步稍微有点麻烦,发送之后,还要绑定数据。
接着是Work线程中的逻辑了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | CallData* data = nullptr;
bool ok = false;
// 获取状态完毕的数据
while (completion_queue_->Next((void**)&data, &ok)) {// 判断队列是否已经结束if (!ok) {break;}// 如果grpc状态ok,则赋值if (data->status.ok()) {data->promise.set_value(std::move(data->response));} else {// 否则设置异常data->promise.set_exception(std::make_exception_ptr(std::runtime_error(data->status.error_message())));}// 删除数据delete data;data = nullptr;
}
|
调用完成队列的Next函数会阻塞,如果队列中存在状态达到最终状态的数据,则返回一条。从完成对于中取到的数据的顺序与入队顺序不同。
上面两个函数组合实现了Future获取和Promise赋值的操作,使得grpc客户端能在fiber中使用。
参考
相关文章:
生产环境使用boost::fiber
简介 boost::fiber是一类用户级线程,也就是纤程。其提供的例子与实际生产环境相距较远,本文将对其进行一定的改造,将其能够投入到生产环境。 同时由于纤程是具有传染性的,使用纤程的代码里也全部要用纤程封装,本文将对…...
TSINGSEE青犀AI视频识别技术+危化安全生产智慧监管方案
一、背景分析 石油与化学工业生产过程复杂多样,涉及的物料易燃易爆、有毒有害,生产条件多高温高压、低温负压,现场危险化学品存储量大、危险源集中,重特大安全事故多发。打造基于工业互联网的安全生产新型能力,提高危…...
小程序request请求封装
以上为本人的项目目录 1.首先在utils中创建request.js文件封装request请求,此封装带上了token,每次请求都会自带token,需要你从后端获取后利用wx.setStorageSync(token,返回的token),不使用的话就是空。 直接复制即可,需要改一下…...
Easy Javadoc插件的使用教程
目录 一、安装Easy Javadoc插件 二、配置注释模板 三、配置翻译 一、安装Easy Javadoc插件 在idea的File-Settings-Plugins中搜索Easy Javadoc插件,点击install进行安装,安装完成后需要restart IDE,重启后插件生效。 二、配置注释模板 …...
一篇文章让你弄懂Java中的方法
目录 1. 方法概念及使用 1.1 什么是方法(method) 1.2 方法定义 1.3 方法调用的执行过程 1.4 实参和形参的关系 1.5 没有返回值的方法 2. 方法重载 2.1 为什么需要方法重载 2.2 方法重载概念 2.3 方法签名 1. 方法概念及使用 1.1 什么是方法(method) 方法就是一…...
SAP MM学习笔记39 - MRP(资材所要量计划)
这一章开始,离开请求书,学点儿新知识啦。 MRP ( Material Requirement Planning ) - 资材所要量计划。 它的位置在下面的调达周期图上来看,就是右上角的 所要量决定那块儿。 1,MRP(资材所要量计划) 的概要 MRP 的主要目的就是 确…...
总线类设备驱动——IIC
目录 一、本章目标 二、IIC设备驱动 2.1 I2C协议简介 2.2 LinuxI2C驱动 2.3 I2C 设备驱动实例 一、本章目标 一条总线可以将多个设备连接在一起,提高了系统的可扩展性能。这个互联的系统通常由三部分组成:总线控制器、物理总线(一组信号线) 和设备。总线控制器…...
MES 的价值点之动态调度
随着数字化技术的发展,为制造企业的生产计划提供了更多的便利。但在实际生产管理过程中,企业的生产计划不管做的多么理想,还是可能会因诸多的扰动因素造成执行与计划差异,这时就需要通过一些动态调整方案去适应新的生产要求与环境…...
dfs序及相关例题
常用的三种dfs序 欧拉序 每经过一次该点记录一次的序列。 dfs序 记录入栈和出栈的序列。 dfn序 只记录入栈的序列。 dfs序 DFS 序列是指 DFS 调用过程中访问的节点编号的序列。 如何求dfs序?可以用以下代码来找dfs序。 vector<vector<int>> g(n…...
python入门实战:爬取图片到本地
简单记录一下爬取网站图片保存到本地指定目录过程,希望对刚入门的小伙伴有所帮助! 目标网站就是下图所示页面: 实现步骤: 1.爬取每页的图片地址集合 2.下载图片到本地 3. 获取指定页数的页面路径 以下是实现代码: import bs4 import requests import os # 下…...
day02 矩阵 2023.10.26
1.矩阵 2.矩阵乘法 3.特殊矩阵 4.逆矩阵 5.正交矩阵 6.几何意义 7.齐次坐标 8.平移矩阵 9.旋转矩阵 10.缩放矩阵 11.复合运算...
浪潮信息inMerge超融合 刷新全球vSAN架构虚拟化VMmark最佳成绩
近日,在国际权威的VMmark测试中,浪潮信息inMerge1100超融合产品搭载NF5280M7服务器,满载运行44Tiles取得40.95分的成绩,刷新了vSAN架构(Intel双路最新平台)虚拟化性能测试纪录。该测试结果证明inMerge1100可…...
【【哈希应用】位图/布隆过滤器】
位图/布隆过滤器 位图位图概念位图的使用位图模拟实现 布隆过滤器布隆过滤器概念布隆过滤器的使用布隆过滤器模拟实现 位图/布隆过滤器应用:海量数据处理哈希切分 位图 位图概念 计算机中通常以位bit为数据最小存储单位,只有0、1两种二进制状态&#x…...
OpenCV学习笔记
一、OpenCV基础 (一)图像的读取、显示、创建 https://mp.weixin.qq.com/s?__bizMzA4MTA1NjM5NQ&mid2247485202&idx1&sn05d0b4cd25675a99357910a5f2694508&chksm9f9b80f6a8ec09e03ab2bb518ea6aad83db007c9cdd602c7459ed75c737e380ac9c3…...
idea 一键部署jar包
上传成功...
16、SpringCloud -- 常见的接口防刷限流方式
目录 接口防刷限流方式1:隐藏秒杀地址需求:思路:代码:前端:后端:测试:总结:方式2:图形验证码1、生成图形验证码需求:思路:代码:前端:后端:测试:2、校验验证码需求:思路:代码:...
Typora(morkdown编辑器)的安装包和安装教程
Typora(morkdown编辑器)的安装包和安装教程 下载安装1、覆盖文件2、输入序列号①打开 typora ,点击“输入序列号”:②邮箱一栏中任意填写(但须保证邮箱地址格式正确),输入序列号,点击…...
服务器不稳定对网站有什么影响
世界上最远的距离,不是树枝无法相依,而是相互了望的星星,却没有交汇的轨迹。 现代技术的进步,导致了人与人之间距 离的消除,直播行业的快速发展的影响和渗透进如今的日常生活,为人们在遥远的距离相见与互诉…...
py实现surf特征提取
import cv2def main():# 加载图像image1 cv2.imread(image1.jpg, cv2.IMREAD_GRAYSCALE)image2 cv2.imread(image2.jpg, cv2.IMREAD_GRAYSCALE)# 创建SURF对象surf cv2.xfeatures2d.SURF_create()# 检测特征点和描述符keypoints1, descriptors1 surf.detectAndCompute(imag…...
MS39233三个半桥驱动器可兼容TMC6300
MS39233 是一款低压三个半桥驱动器。可兼容 TMC6300(功能基本一致,管脚不兼容)。它可应用于低电压及电池供电的运动控制场合。并且内置电荷泵来提供内部功率 NMOS 所需的栅驱动电压。 MS39233 可以提供最高 2.8A 的峰值电流,其功率…...
让旧款iPhone/iPad重获新生:Legacy-iOS-Kit终极使用指南
让旧款iPhone/iPad重获新生:Legacy-iOS-Kit终极使用指南 【免费下载链接】Legacy-iOS-Kit An all-in-one tool to restore/downgrade, save SHSH blobs, jailbreak legacy iOS devices, and more 项目地址: https://gitcode.com/gh_mirrors/le/Legacy-iOS-Kit …...
【免费下载】 STM32Cube_FW_F4_V1.16.0 固件库
STM32Cube_FW_F4_V1.16.0 固件库 【下载地址】STM32Cube_FW_F4_V1.16.0固件库 本仓库提供了STM32CubeFW_F4_V1.16.0固件包的直接下载资源。STM32Cube是一个完整的软件平台,旨在支持STMicroelectronics(意法半导体)的STM32系列微控制器。这个特…...
SQL注入技术详解:从联合查询到盲注实战
前言: 继续开始我们的SQL注入吧!本文详细讲解SQL注入的各类技术,包括联合查询、报错注入、布尔盲注、时间盲注、UA注入、Referer注入等,涵盖漏洞判断、利用方法和实战步骤。内容基于MySQL 5.0以上环境,围绕information…...
告别RaiDrive广告!用rclone+Alist免费打造Windows云盘本地文件夹(含开机自启脚本)
开源云盘本地化方案:Alist与rclone的无缝整合指南 在数字资产管理日益重要的今天,云存储已成为个人和企业不可或缺的工具。然而,商业软件的广告推送、订阅费用和功能限制常常让用户感到困扰。本文将介绍一套完全开源、零成本的解决方案&#…...
无人机巡检避坑指南:用YOLOv5n做罂粟识别,这些光照和遮挡问题怎么解决?
无人机巡检实战:YOLOv5n在复杂环境下的罂粟识别优化策略 清晨的露珠还挂在叶片上,无人机已经盘旋在田野上空。对于从事智能巡检的工程师来说,这样的场景再熟悉不过——但随之而来的挑战也令人头疼:强烈的晨光让部分区域过曝&#…...
STM32F103C8T6最小系统板避坑指南:从ST-LINK接线到Keil5乱码,新手必看的5个实战问题
STM32F103C8T6最小系统板避坑指南:从ST-LINK接线到Keil5乱码,新手必看的5个实战问题 第一次点亮STM32开发板的LED时,那种成就感就像电子工程师的"成人礼"。但通往成功的路上往往布满荆棘——接错一根线可能导致整晚的调试失败&…...
冥想第一千八百八十二天(1882)
1.周六,醒的很早,然后去锦和公园转了一圈,一直在等待大雨,结果到了傍晚才下,浪费了一天,不过天气很不好,就不适合外出了。敬畏大自然。 2.感谢父母,感谢朋友,感谢家人&am…...
别再只认Revit了!盘点7种主流BIM数据格式(RVT/IFC/FBX...)的优缺点与选型指南
建筑数字化进阶指南:7大BIM数据格式深度解析与实战选型策略 在建筑信息模型(BIM)与地理信息系统(GIS)加速融合的今天,数据格式的选择直接影响着项目协同效率与成果交付质量。当设计院的Revit模型需要与施工…...
Unity加载倾斜摄影模型踩坑记:从3MX/OSGB文件到流畅渲染,我解决了这几个问题
Unity倾斜摄影模型加载实战:从3MX/OSGB到跨平台渲染的深度解决方案 第一次在Unity中加载倾斜摄影模型时,那种期待和忐忑交织的心情至今难忘。作为智慧城市项目的核心展示环节,我们需要将航拍生成的3MX和OSGB格式模型无缝集成到Unity场景中。本…...
实测对比:百度OCR车牌识别在夜间、侧拍、模糊场景下的效果到底怎么样?
百度OCR车牌识别实战评测:夜间、侧拍与模糊场景下的真实表现 当停车场道闸自动抬起,交通卡口违章记录自动生成,这些看似简单的场景背后都依赖一项关键技术——车牌识别。作为计算机视觉领域的经典应用,车牌识别技术已经从实验室走…...
