当前位置: 首页 > news >正文

生产环境使用boost::fiber

简介

boost::fiber是一类用户级线程,也就是纤程。其提供的例子与实际生产环境相距较远,本文将对其进行一定的改造,将其能够投入到生产环境。
同时由于纤程是具有传染性的,使用纤程的代码里也全部要用纤程封装,本文将对一些组件进行简单封装。

fiber封装

boost::fiber支持设置pthread和fiber的比例是1:n还是m:n,同时也支持设置调度方式是随机调度还是抢占调度。
本文中选择使用抢占式调度,并且是m:n的比例,这种选择适用面更加广。
既然pthread和fiber比例是m:n,那么这个m一般等于逻辑核数量,也就是需要设置fiber调度的线程控制在大小为固定的线程池中。fiber中抢占式调度方式也要求固定的线程池数量,外部前程加入时,可能会影响抢占式调度,即不能在外部线程中调用fiber,不然这个线程就加入到了fiber调度的pthread中了。

这时,需要一个设置一个队列,外部线程往这个队列中添加任务;内部线程池从队列中取任务,同时触发fiber,在fiber中可以继续触发fiber。触发队列、内部队列、工作线程、外部线程的关系如下图所示:

图片

运行逻辑被装箱到一个任务中,然后被添加到任务队列,这一步利用模板和上转型实现,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class IFiberTask {public:IFiberTask() = default;virtual ~IFiberTask() = default;IFiberTask(const IFiberTask& rhs) = delete;IFiberTask& operator=(const IFiberTask& rhs) = delete;IFiberTask(IFiberTask&& other) = default;IFiberTask& operator=(IFiberTask&& other) = default;virtual void execute() = 0;public:inline static std::atomic_size_t fibers_size {0};
};template <typename Func>
class FiberTask: public IFiberTask {public:explicit FiberTask(Func&& func) :func_{std::move(func)} { }~FiberTask() override = default;FiberTask(const FiberTask& rhs) = delete;FiberTask& operator=(const FiberTask& rhs) = delete;FiberTask(FiberTask&& other)  noexcept = default;FiberTask& operator=(FiberTask&& other)  noexcept = default;void execute() override {fibers_size.fetch_add(1);func_();fibers_size.fetch_sub(1);}private:Func func_;
};

IFiberTask是任务基类,不可拷贝;FiberTask是模板类,成员变量func_存储算子。使用IFiberTask类指针指向特化后的FiberTask对象,这时就实现的装箱操作,调用execute时,实际调用了子类的execute,触发封装的func_对象。

外部队列基于boost::fibers::buffered_channel实现,这是一个支持并发的队列,队列的元素类型为std::tuple<boost::fibers::launch, std::unique_ptr>,其中tuple第一元素存储任务的触发形式,进入队列还是立即触发。

接着是任务装箱,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template<typename Func, typename... Args>
auto Submit(boost::fibers::launch launch_policy, Func&& func, Args&&... args) {// 捕获lambda极其参数auto capture = [func = std::forward<Func>(func),args = std::make_tuple(std::forward<Args>(args)...)]() mutable {return std::apply(std::move(func), std::move(args));};// 任务的返回值类型using task_result_t = std::invoke_result_t<decltype(capture)>;// 该任务packaged_task的using packaged_task_t = boost::fibers::packaged_task<task_result_t()>;// 创建任务对象packaged_task_t task {std::move(capture)};// 装箱到FiberTask中using task_t = fiber::FiberTask<packaged_task_t>;// 获取packaged_task的futureauto result_future = task.get_future();// 添加到buffered_channel中auto status = work_queue_.push(std::make_tuple(launch_policy, std::make_unique<task_t>(std::move(task))));if (status != boost::fibers::channel_op_status::success) {return std::optional<std::decay_t<decltype(result_future)>> {};}return std::make_optional(std::move(result_future));
}

代码中,先捕获lambda表达式及其参数,获取返回值类型并添加到packaged_task中,然后装箱到FiberTask中,使用packaged_task获取future并返回,FiberTask对象添加到队列中,使用IFiberTask的指针指向这个对象,实现装箱操作。

接着是内部任务触发的逻辑,首先创建一个线程池,每个线程注册调度器,接着从队列中获取任务,触发fiber。
工作线程的执行函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 注册调度算法为抢占式调度
boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(threads_size_, true);
// 创建task类型
auto task_tuple = typename decltype(work_queue_)::value_type {};// 从队列中获取任务
while(boost::fibers::channel_op_status::success == work_queue_.pop(task_tuple)) {// 解包auto& [launch_policy, task_to_run] = task_tuple;// 触发 fiber并detachboost::fibers::fiber(launch_policy, [task = std::move(task_to_run)]() {task->execute();}).detach();
}

抢占式调度在注册时需要指定线程池大小,这时不能在外部线程中调用fiber,因为调用fiber的时候会把该线程添加到fiber调度的线程中,也就调整了fiber的worker线程数量。

以上代码实现了fiber触发器、任务队列、工作线程池等逻辑。
理论上可以创建多个fiber调度组件对象,每个组件根据自己的需要设置资源情况。
但实际应用中,还是建议使用一个全局调度组件,因为当A调度器中的任务依赖B调度器的任务的同时,就会出现阻塞工作线程,影响实际性能。

下面封装一个全局调度器,提供递交任务的接口和结束调度的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DefaultPool {private:static auto* Pool() {const static size_t size = std::thread::hardware_concurrency();static fiber::FiberPool pool(size, size*8);return &pool;}public:template<typename Func, typename... Args>static auto SubmitJob(boost::fibers::launch launch_policy, Func &&func, Args &&... args) {return Pool()->Submit(launch_policy, std::forward<Func>(func), std::forward<Args>(args)...);}template<typename Func, typename... Args>static auto SubmitJob(Func &&func, Args &&... args) {return Pool()->Submit(std::forward<Func>(func), std::forward<Args>(args)...);}static void Close() {Pool()->CloseQueue();}private:DefaultPool() = default;
};

其他组件封装

上面对boost::fiber进行封装,得到一个能投入生产环境的调度器。
但是仅仅是这些是不够的,毕竟对于生产环境中的服务而言,外部服务、中间件的依赖是不能少的。
纤程是具有传染性的,对于外部组件提供的sdk,发送请求并进行同步等待会阻塞纤程对应的工作线程,影响整套机制。
为此,需要对现有的组件进行封装,对于同步接口,需要使用线程池配合fiber::promise;对于异步接口,可以改造成fiber::promise、future机制。下面介绍几种常用组件的fiber封装。

redis客户端封装

同步接口加线程池的方式将同步接口改造成异步接口的方案,存在较大的安全隐患。
线程池的容量不可控,当流量突然增加时,需要大量线程去等待,从而耗尽线程池资源,造成任务大量积压,服务崩溃。
而扩大线程池数量,又消耗了大量的资源。

综上,对于fiber化封装,还是建议采用异步接口。hiredis库支持异步接口,redis_plus_plus库对hiredis进行了c++封装,同时也提供了异步接口,本节将面向这个接口进行改造。

redis提供了挺多的接口,这里只对del、get、set三个接口做个示范:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
template<typename Type>
using Promise = boost::fibers::promise<Type>;template<typename Type>
using Future = boost::fibers::future<Type>;Future<long long > Del(const StringView &key) {auto promise = std::make_unique<Promise<long long >>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.del(key, [promise =promise.release()](sw::redis::Future<long long > &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}Future<OptionalString> Get(const StringView &key) {auto promise = std::make_unique<Promise<OptionalString>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.get(key, [promise = promise.release()](sw::redis::Future<OptionalString> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}Future<bool> Set(const StringView &key, const StringView &val) {auto promise = std::make_unique<Promise<bool>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.set(key, val, [promise = promise.release()](sw::redis::Future<bool> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}

注意,redis_plus_plus对每个回调函数通过模板进行判断,因此无法使用mutable+移动捕获promise,只能使用指针赋值的方式实现。redis_plus_plus在1.3.6以后的版本才有回调函数机制,之前的版本不支持。
上面原理是,创建fiber的promise和future,然后让redis的回调函数中捕获promise,并在promise中对数据进行赋值。而外部使用fiber的future进行等待,并不会阻塞工作线程。

grpc客户端封装

跟上面的redis客户端类似,这里也建议对grpc的异步客户端进行改造,支持fiber的promise、future机制。
grpc的异步客户端需要牵扯到grpc::CompletionQueue,里面实现了一套poll engine,需要绑定一个线程去进行epoll_wait操作。首先定义一个GrpcClient类,包含四个成员变量、两个成员函数,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class GrpcClient {public:explicit GrpcClient(const ClientOption& option);~GrpcClient();// 对外提供的接口Future<meta::HelloResponse> Call(const meta::HelloRequest& request);private:// worker线程执行的逻辑void Work();private:std::unique_ptr<grpc::CompletionQueue> completion_queue_;std::thread worker_;std::shared_ptr<grpc::Channel> channel_;gpr_timespec timespec_{};
};

异步客户端分为三个部分逻辑,第一个是请求发送(Call函数),第二个是io线程批量处理,第三个是外部等待Future。
为了能够让io线程里给Promise进行赋值,需要Call函数中将Promise及其相关上下文传递到io线程中,这里定义一个上下文结构体:

1
2
3
4
5
6
struct CallData {grpc::ClientContext context;          // grpc上下文Promise<meta::HelloResponse> promise; // Promise对象grpc::Status status;                  // grpc调用状态meta::HelloResponse response;         // 相应包
};

Call函数中的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
// 创建上下文对象
auto data = new CallData;
// 设置超时时间
data->context.set_deadline(timespec_);
// 创建桩
meta::HelloService::Stub stub(channel_);
auto future = data->promise.get_future();
// 异步调用,添加到完成队列中
auto rpc = stub.Asynchello(&data->context, request, completion_queue_.get());
// 绑定response、status,并将上下文对象作为tag传下去
rpc->Finish(&data->response, &data->status, reinterpret_cast<void*>(data));
return future;

data对象在该函数中创建,在Work函数中释放,不存在内存泄漏问题。
grpc的异步稍微有点麻烦,发送之后,还要绑定数据。
接着是Work线程中的逻辑了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CallData* data = nullptr;
bool ok = false;
// 获取状态完毕的数据
while (completion_queue_->Next((void**)&data, &ok)) {// 判断队列是否已经结束if (!ok) {break;}// 如果grpc状态ok,则赋值if (data->status.ok()) {data->promise.set_value(std::move(data->response));} else {// 否则设置异常data->promise.set_exception(std::make_exception_ptr(std::runtime_error(data->status.error_message())));}// 删除数据delete data;data = nullptr;
}

调用完成队列的Next函数会阻塞,如果队列中存在状态达到最终状态的数据,则返回一条。从完成对于中取到的数据的顺序与入队顺序不同。

上面两个函数组合实现了Future获取和Promise赋值的操作,使得grpc客户端能在fiber中使用。

参考

  • fiberpool代码
  • 生产环境使用fiber
  • grpc异步客户端
  • hiredis
  • 生产环境使用boost::fiber

相关文章:

生产环境使用boost::fiber

简介 boost::fiber是一类用户级线程&#xff0c;也就是纤程。其提供的例子与实际生产环境相距较远&#xff0c;本文将对其进行一定的改造&#xff0c;将其能够投入到生产环境。 同时由于纤程是具有传染性的&#xff0c;使用纤程的代码里也全部要用纤程封装&#xff0c;本文将对…...

TSINGSEE青犀AI视频识别技术+危化安全生产智慧监管方案

一、背景分析 石油与化学工业生产过程复杂多样&#xff0c;涉及的物料易燃易爆、有毒有害&#xff0c;生产条件多高温高压、低温负压&#xff0c;现场危险化学品存储量大、危险源集中&#xff0c;重特大安全事故多发。打造基于工业互联网的安全生产新型能力&#xff0c;提高危…...

小程序request请求封装

以上为本人的项目目录 1.首先在utils中创建request.js文件封装request请求&#xff0c;此封装带上了token&#xff0c;每次请求都会自带token&#xff0c;需要你从后端获取后利用wx.setStorageSync(token,返回的token),不使用的话就是空。 直接复制即可&#xff0c;需要改一下…...

Easy Javadoc插件的使用教程

目录 一、安装Easy Javadoc插件 二、配置注释模板 三、配置翻译 一、安装Easy Javadoc插件 在idea的File-Settings-Plugins中搜索Easy Javadoc插件&#xff0c;点击install进行安装&#xff0c;安装完成后需要restart IDE&#xff0c;重启后插件生效。 二、配置注释模板 …...

一篇文章让你弄懂Java中的方法

目录 1. 方法概念及使用 1.1 什么是方法(method) 1.2 方法定义 1.3 方法调用的执行过程 1.4 实参和形参的关系 1.5 没有返回值的方法 2. 方法重载 2.1 为什么需要方法重载 2.2 方法重载概念 2.3 方法签名 1. 方法概念及使用 1.1 什么是方法(method) 方法就是一…...

SAP MM学习笔记39 - MRP(资材所要量计划)

这一章开始&#xff0c;离开请求书&#xff0c;学点儿新知识啦。 MRP ( Material Requirement Planning ) - 资材所要量计划。 它的位置在下面的调达周期图上来看&#xff0c;就是右上角的 所要量决定那块儿。 1&#xff0c;MRP(资材所要量计划) 的概要 MRP 的主要目的就是 确…...

总线类设备驱动——IIC

目录 一、本章目标 二、IIC设备驱动 2.1 I2C协议简介 2.2 LinuxI2C驱动 2.3 I2C 设备驱动实例 一、本章目标 一条总线可以将多个设备连接在一起&#xff0c;提高了系统的可扩展性能。这个互联的系统通常由三部分组成:总线控制器、物理总线(一组信号线) 和设备。总线控制器…...

MES 的价值点之动态调度

随着数字化技术的发展&#xff0c;为制造企业的生产计划提供了更多的便利。但在实际生产管理过程中&#xff0c;企业的生产计划不管做的多么理想&#xff0c;还是可能会因诸多的扰动因素造成执行与计划差异&#xff0c;这时就需要通过一些动态调整方案去适应新的生产要求与环境…...

dfs序及相关例题

常用的三种dfs序 欧拉序 每经过一次该点记录一次的序列。 dfs序 记录入栈和出栈的序列。 dfn序 只记录入栈的序列。 dfs序 DFS 序列是指 DFS 调用过程中访问的节点编号的序列。 如何求dfs序&#xff1f;可以用以下代码来找dfs序。 vector<vector<int>> g(n…...

python入门实战:爬取图片到本地

简单记录一下爬取网站图片保存到本地指定目录过程,希望对刚入门的小伙伴有所帮助! 目标网站就是下图所示页面: 实现步骤: 1.爬取每页的图片地址集合 2.下载图片到本地 3. 获取指定页数的页面路径 以下是实现代码: import bs4 import requests import os # 下…...

day02 矩阵 2023.10.26

1.矩阵 2.矩阵乘法 3.特殊矩阵 4.逆矩阵 5.正交矩阵 6.几何意义 7.齐次坐标 8.平移矩阵 9.旋转矩阵 10.缩放矩阵 11.复合运算...

浪潮信息inMerge超融合 刷新全球vSAN架构虚拟化VMmark最佳成绩

近日&#xff0c;在国际权威的VMmark测试中&#xff0c;浪潮信息inMerge1100超融合产品搭载NF5280M7服务器&#xff0c;满载运行44Tiles取得40.95分的成绩&#xff0c;刷新了vSAN架构&#xff08;Intel双路最新平台&#xff09;虚拟化性能测试纪录。该测试结果证明inMerge1100可…...

【【哈希应用】位图/布隆过滤器】

位图/布隆过滤器 位图位图概念位图的使用位图模拟实现 布隆过滤器布隆过滤器概念布隆过滤器的使用布隆过滤器模拟实现 位图/布隆过滤器应用&#xff1a;海量数据处理哈希切分 位图 位图概念 计算机中通常以位bit为数据最小存储单位&#xff0c;只有0、1两种二进制状态&#x…...

OpenCV学习笔记

一、OpenCV基础 &#xff08;一&#xff09;图像的读取、显示、创建 https://mp.weixin.qq.com/s?__bizMzA4MTA1NjM5NQ&mid2247485202&idx1&sn05d0b4cd25675a99357910a5f2694508&chksm9f9b80f6a8ec09e03ab2bb518ea6aad83db007c9cdd602c7459ed75c737e380ac9c3…...

idea 一键部署jar包

上传成功...

16、SpringCloud -- 常见的接口防刷限流方式

目录 接口防刷限流方式1:隐藏秒杀地址需求:思路:代码:前端:后端:测试:总结:方式2:图形验证码1、生成图形验证码需求:思路:代码:前端:后端:测试:2、校验验证码需求:思路:代码:...

Typora(morkdown编辑器)的安装包和安装教程

Typora&#xff08;morkdown编辑器&#xff09;的安装包和安装教程 下载安装1、覆盖文件2、输入序列号①打开 typora &#xff0c;点击“输入序列号”&#xff1a;②邮箱一栏中任意填写&#xff08;但须保证邮箱地址格式正确&#xff09;&#xff0c;输入序列号&#xff0c;点击…...

服务器不稳定对网站有什么影响

世界上最远的距离&#xff0c;不是树枝无法相依&#xff0c;而是相互了望的星星&#xff0c;却没有交汇的轨迹。 现代技术的进步&#xff0c;导致了人与人之间距 离的消除&#xff0c;直播行业的快速发展的影响和渗透进如今的日常生活&#xff0c;为人们在遥远的距离相见与互诉…...

py实现surf特征提取

import cv2def main():# 加载图像image1 cv2.imread(image1.jpg, cv2.IMREAD_GRAYSCALE)image2 cv2.imread(image2.jpg, cv2.IMREAD_GRAYSCALE)# 创建SURF对象surf cv2.xfeatures2d.SURF_create()# 检测特征点和描述符keypoints1, descriptors1 surf.detectAndCompute(imag…...

MS39233三个半桥驱动器可兼容TMC6300

MS39233 是一款低压三个半桥驱动器。可兼容 TMC6300&#xff08;功能基本一致&#xff0c;管脚不兼容&#xff09;。它可应用于低电压及电池供电的运动控制场合。并且内置电荷泵来提供内部功率 NMOS 所需的栅驱动电压。 MS39233 可以提供最高 2.8A 的峰值电流&#xff0c;其功率…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

设计模式和设计原则回顾

设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

使用分级同态加密防御梯度泄漏

抽象 联邦学习 &#xff08;FL&#xff09; 支持跨分布式客户端进行协作模型训练&#xff0c;而无需共享原始数据&#xff0c;这使其成为在互联和自动驾驶汽车 &#xff08;CAV&#xff09; 等领域保护隐私的机器学习的一种很有前途的方法。然而&#xff0c;最近的研究表明&…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)

设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile&#xff0c;新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...

如何在最短时间内提升打ctf(web)的水平?

刚刚刷完2遍 bugku 的 web 题&#xff0c;前来答题。 每个人对刷题理解是不同&#xff0c;有的人是看了writeup就等于刷了&#xff0c;有的人是收藏了writeup就等于刷了&#xff0c;有的人是跟着writeup做了一遍就等于刷了&#xff0c;还有的人是独立思考做了一遍就等于刷了。…...

Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?

在大数据处理领域&#xff0c;Hive 作为 Hadoop 生态中重要的数据仓库工具&#xff0c;其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式&#xff0c;很多开发者常常陷入选择困境。本文将从底…...