C++实现线程池
C++实现线程池
- 一、前言
- 二、线程池的接口设计
- 2.1、类封装
- 2.2、线程池的初始化
- 2.3、线程池的启动
- 2.4、线程池的停止
- 2.5、线程的执行函数run()
- 2.6、任务的运行函数
- 2.7、等待所有线程结束
- 三、测试线程池
- 四、源码地址
- 总结
一、前言
C++实现的线程池,可能涉及以下知识点:
- decltype。
- packaged_task。
- make_shared。
- mutex。
- unique_lock。
- notify_one。
- future。
- queue。
- bind。
- thread。
等等。
二、线程池的接口设计
(1)封装一个线程池的类。
(2)线程池的初始化:设置线程的数量。
(3)启动线程池:创建线程等工作。
(4)执行任务的函数。
(5)停止线程池。
(6)等所有任务执行完成,退出执行函数。
2.1、类封装
线程池类,采用c++11来实现。
#ifndef _CPP_THREAD_POOL_H_
#define _CPP_THREAD_POOL_H_#include <iostream>
#include <functional>
#include <memory>
#include <queue>
#include <mutex>
#include <vector>
#include <thread>
#include <future>#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endifusing namespace std;void getNow(timeval *tv);
int64_t getNowMs();#define TNOW getNow()
#define TNOWMS getNowMs()class CPP_ThreadPool{
protected:struct TaskFunc{TaskFunc(uint64_t expireTime):_expireTime(expireTime){}int64_t _expireTime=0;//超时的绝对时间function<void()> _func;};typedef shared_ptr<TaskFunc> TaskFuncPtr;/* * @brief 获取任务 ** *@return TaskFuncPtr */bool get(TaskFuncPtr& task);/** @brief 线程池是否退出*/bool isTerminate(){return _bTerminate;}/** @brief 线程运行态*/void run();public: /** @brief 构造函数 */CPP_ThreadPool(); /* * @brief 析构, 会停止所有线程 */virtual ~CPP_ThreadPool();/* * * @brief 初始化. * * @param num 工作线程个数 */bool init(size_t num);/** @brief 停止所有线程, 会等待所有线程结束 */void stop();/** @brief 启动所有线程 */bool start();/* * @brief 等待当前任务队列中, 所有工作全部结束(队列无任务). * @param millsecond 等待的时间(ms), -1:永远等待 * @return true, 所有工作都处理完毕 * false,超时退出 */bool waitForAllDone(int millsecond=-1);/** @brief 获取线程个数.* @return size_t 线程个数 */size_t getThreadNum(){unique_lock<mutex> lock(_mutex);return _threads.size();}/** @brief 获取当前线程池的任务数* @return size_t 线程池的任务数 */size_t getJobNum(){unique_lock<mutex> lock(_mutex);return _tasks.size();}/** @brief 用线程池启用任务(F是function, Args是参数) ** * @param ParentFunctor * @param tf * @return 返回任务的future对象, 可以通过这个对象来获取返回值 */template <class F,class... Args>auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>{return exec(0,f,args...);}/* * unused.** @brief 用线程池启用任务(F是function, Args是参数) * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃 * @param bind function * @return 返回任务的future对象, 可以通过这个对象来获取返回值 ** template <class F, class... Args> * 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数 * auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))> * std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值 * 返回值后置*/template<class F,class... Args>auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>{//获取现在时间int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;// 定义返回值类型using retType=decltype(f(args...));// 封装任务auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));// 封装任务指针,设置过期时间TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);fPtr->_func=[task](){(*task)();};unique_lock<mutex> lock(_mutex);// 插入任务_tasks.push(fPtr);// 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify_condition.notify_one();return task->get_future();}protected:size_t _threadNum;//线程数量bool _bTerminate;//判定是否终止线程池mutex _mutex; //唯一锁vector<thread*> _threads; //工作线程数组queue<TaskFuncPtr> _tasks; //任务队列condition_variable _condition;//条件变量atomic<int> _atomic{0};//原子变量
};#endif
使用示例:
CPP_ThreadPool tpool;
tpool.init(5); //初始化线程池线程数
//启动线程方式
tpool.start();
//将任务丢到线程池中*
tpool.exec(testFunction, 10); //参数和start相同
//等待线程池结束
tpool.waitForAllDone(1000); //参数<0时, 表示无限等待(注意有人调用stop也会推出)
//此时: 外部需要结束线程池是调用
tpool.stop();
注意:ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如:
int testInt(int i)
{ return i;
}
auto f = tpool.exec(testInt, 5);
cout << f.get() << endl; //当testInt在线程池中执行后, f.get()会返回数值5 class Test
{
public: int test(int i);
};
Test t;
auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10);
//返回的future对象, 可以检查是否执行
cout << f.get() << endl;
2.2、线程池的初始化
主要是设置线程池中线程的数量,如果线程池已经存在则直接返回,防止重复初始化。
bool CPP_ThreadPool::init(size_t num)
{unique_lock<mutex> lock(_mutex);if(!_threads.empty())return false;_threadNum=num;return true;
}
2.3、线程池的启动
根据设置的线程数量,创建线程并保存在一个数组中。如果线程池已经存在则直接返回,防止重复启动。
bool CPP_ThreadPool::start()
{unique_lock<mutex> lock(_mutex);if(!_threads.empty())return false;for(size_t i=0;i<_threadNum;i++){_threads.push_back(new thread(&CPP_ThreadPool::run,this));}return true;
}
2.4、线程池的停止
设置线程退出条件,并通知所有线程。停止时,要等待所有线程都执行完任务,再销毁线程。
需要注意锁的粒度。
void CPP_ThreadPool::stop()
{// 注意要有这个{},不然会死锁。{unique_lock<mutex> lock(_mutex);_bTerminate=true;_condition.notify_all();}size_t thdCount=_threads.size();for(size_t i=0;i<thdCount;i++){if(_threads[i]->joinable()){_threads[i]->join();}delete _threads[i];_threads[i]=NULL;}unique_lock<mutex> lock(_mutex);_threads.clear();
}
2.5、线程的执行函数run()
读取任务:判断任务是否存在,如果任务队列为空,则进入等待状态直到任务队列不为空或退出线程池(这里需要两次判断,因为可能存在虚假唤醒)。
执行任务:调用匿名函数。
检测所有任务都是否执行完毕:这里使用了原子变量来检测任务是否都执行完,原因在于任务队列为空不代表任务已经执行完(任务可能还在运行中、也可能是任务刚弹出还没运行),使用原子变量来计数就更严谨。
bool CPP_ThreadPool::get(TaskFuncPtr& task)
{unique_lock<mutex> lock(_mutex);if(_tasks.empty())//判断任务是否存在{_condition.wait(lock,[this]{return _bTerminate || !_tasks.empty();//唤醒条件});}if(_bTerminate)return false;if(!_tasks.empty())//判断任务是否存在{task=move(_tasks.front());// 使用移动语义_tasks.pop();//弹出一个任务return true;}return false;
}// 执行任务的线程
void CPP_ThreadPool::run()
{while(!isTerminate()){TaskFuncPtr task;// 读取任务bool ok=get(task);if(ok){++_atomic;try{if(task->_expireTime!=0 && task->_expireTime < TNOWMS){// 处理超时任务}elsetask->_func();//执行任务}catch(...){}--_atomic;// 任务执行完毕,这里只是为了通知waitForAllDoneunique_lock<mutex> lock(_mutex);if(_atomic==0 && _tasks.empty())_condition.notify_all();}}
}
2.6、任务的运行函数
这里使用了可变模块参数、智能指针、bind、function、捕获列表的相关技术知识。
返回任务的future对象, 可以通过这个对象来获取返回值。
超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃。
可变模块参数对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数。
/** @brief 用线程池启用任务(F是function, Args是参数) ** * @param ParentFunctor * @param tf * @return 返回任务的future对象, 可以通过这个对象来获取返回值 */template <class F,class... Args>auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>{return exec(0,f,args...);}/* * unused.** @brief 用线程池启用任务(F是function, Args是参数) * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃 * @param bind function * @return 返回任务的future对象, 可以通过这个对象来获取返回值 ** template <class F, class... Args> * 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数 * auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))> * std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值 * 返回值后置*/template<class F,class... Args>auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>{//获取现在时间int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;// 定义返回值类型using retType=decltype(f(args...));// 封装任务auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));// 封装任务指针,设置过期时间TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);fPtr->_func=[task](){(*task)();};unique_lock<mutex> lock(_mutex);// 插入任务_tasks.push(fPtr);// 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify_condition.notify_one();return task->get_future();}
2.7、等待所有线程结束
bool CPP_ThreadPool::waitForAllDone(int millsecond)
{unique_lock<mutex> lock(_mutex);if(_tasks.empty())return true;if(millsecond<0){_condition.wait(lock,[this]{ return _tasks.empty();});return true;}else{return _condition.wait_for(lock,chrono::milliseconds(millsecond),[this]{ return _tasks.empty();});}
}
三、测试线程池
#include <iostream>
#include "cppThreadPool.h"using namespace std;void func1(int a)
{ cout << "func1() a=" << a << endl;
}
void func2(int a, string b)
{ cout << "func2() a=" << a << ", b=" << b<< endl;
}void func3()
{cout<<"func3"<<endl;
}void test01()
{cout<<"test 01"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池// 执行任务threadpool.exec(func1,10);threadpool.exec(func2,20,"FLY.");threadpool.exec(1000,func3);threadpool.waitForAllDone();threadpool.stop();
}int func1_future(int a)
{ cout << "func1() a=" << a << endl; return a;
}string func2_future(int a, string b)
{ cout << "func2() a=" << a << ", b=" << b<< endl; return b;
}void test02()
{cout<<"test 02"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池future<decltype(func1_future(0))> ret01=threadpool.exec(func1_future,10);future<string> ret02=threadpool.exec(func2_future,20,"FLY.");threadpool.waitForAllDone();cout<<"ret01 = "<<ret01.get()<<endl;cout<<"ret02 = "<<ret02.get()<<endl;threadpool.stop();}class Test{
public:int test(int a){cout<<_name<<": a = "<<a<<endl;return a+1;}void setname(string name){_name=name;}string _name;};void test03()
{cout<<"test 03"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池Test t1;Test t2;t1.setname("Test 1");t2.setname("Test 2");auto f1=threadpool.exec(bind(&Test::test,&t1,placeholders::_1),10);auto f2=threadpool.exec(bind(&Test::test,&t2,placeholders::_1),20);threadpool.waitForAllDone();cout<<"f1 = "<<f1.get()<<endl;cout<<"f2 = "<<f2.get()<<endl;threadpool.stop();}int main(int argc,char **argv)
{// 简单测试线程池test01();// 测试任务函数返回值test02();// 测试类对象函数的绑定test03();return 0;
}
执行结果:
test 01
func1() a=10
func2() a=20, b=FLY.
func3
test 02
func1() a=10
func2() a=20, b=FLY.
ret01 = 10
ret02 = FLY.
test 03
Test 1: a = 10
Test 2: a = 20
f1 = 11
f2 = 21
四、源码地址
源码已经上传github。
总结
线程池的核心:初始化、线程启动、执行函数、线程停止。
相关文章:

C++实现线程池
C实现线程池一、前言二、线程池的接口设计2.1、类封装2.2、线程池的初始化2.3、线程池的启动2.4、线程池的停止2.5、线程的执行函数run()2.6、任务的运行函数2.7、等待所有线程结束三、测试线程池四、源码地址总结一、前言 C实现的线程池,可能涉及以下知识点&#…...

2023最新Java面试手册(性能优化+微服务架构+并发编程+开源框架)
Java面试手册 一、性能优化面试专栏 1.1、 tomcat性能优化整理 1.2、JVM性能优化整理 1.3、Mysql性能优化整理 二、微服务架构面试专栏 2.1、SpringCloud面试整理 2.2、SpringBoot面试整理 2.3、Dubbo面试整理 三、并发编程高级面试专栏 四、开源框架面试题专栏 4.1、Sprin…...

对灵敏度分析技术进行建模(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...

完整爬虫学习笔记(第一章)
文章目录前言:fu:. 爬虫概述:hotdog:原理解剖:one: 服务器渲染:two: 前端JS渲染:fire: 第一个爬虫程序案例总结前言 最近正在学习Python网络爬虫的相关知识,鉴于本人Python水平有限 , 对Python并无太深的理解,所以此文章的主要目的在于抛砖引玉…...

会计师项目管理软件是什么,哪些必不可少的功能
欢迎阅读现代金融专业人士的会计师项目管理指南。在本文中,我们将深入探讨在基于项目的会计的各个方面使用项目管理方法的好处。我们还将教您面临哪些挑战以及如何为您的团队选择最佳工具。 为什么会计师的项目管理很重要? 在会计方面,目标始…...
第 8 章 优化
目录 8.1 优化概述 8.2 优化 SQL 语句 8.3 优化和指标 8.4 优化数据库结构 8.5 优化 InnoDB 表 8.6 优化 MyISAM 表 8.7 内存表的优化 8.8 了解查询执行计划 8.9 控制查询优化器 8.10 缓冲和缓存 8.11 优化锁定操作 8.12 优化 MySQL 服务器 8.13 衡量性能ÿ…...
剑指offer -- java题解
剑指offer -- java题解刷题地址1、数字在升序数组中出现的次数2、二叉搜索树的第k个节点3、二叉树的深度4、数组中只出现一次的两个数字5、和为S的两个数字6、左旋转字符串7、滑动窗口的最大值8、扑克牌顺子9、孩子们的游戏(圆圈中最后剩下的数)10、买卖股票的最好时机(一)刷题…...

若依ruoyi——手把手教你制作自己的管理系统【二、修改样式】
阿里图标一( ̄︶ ̄*)) 图片白嫖一((* ̄3 ̄)╭ ********* 专栏略长 爆肝万字 细节狂魔 请准备好一键三连 ********* 运行成功后: idea后台正常先挂着 我习惯用VScode操作 当然如果有两台机子 一个挂后台一个改前端就更好…...
2023.2.14每日一题——455. 分发饼干
每日一题题目描述解题核心解法一:双指针题目描述 题目链接:455. 分发饼干 假设你是一位很棒的家长,想要给你的孩子们一些小饼干。但是,每个孩子最多只能给一块饼干。 对每个孩子 i,都有一个胃口值 g[i],…...
MySQL入门篇-MySQL常用字符函数小结
备注:测试数据库版本为MySQL 8.0 这个blog我们来聊聊常见的字符函数 函数名函数用途UPPER()返回大写的字符LOWER()返回小写的字符LTRIM()左边去掉空格TRIM()去掉空格RTRIM()右边去掉空格SPACE()返回指定长度的空格CONCAT()连接字符串CONCAT_WS()指定分隔符连接字符串CHAR_LEN…...

解决不同影像裁剪后栅格数据行列不一致问题
前言在处理栅格数据时,尽管用同一个矢量文件裁剪栅格数据,不同数据来源的栅格行列数也会出现不一致的情况。如果忽略或解决不好,会导致后续数据处理出现意想不到的误差或错误,尤其是利用编程实现数据处理时。因此,应当…...

visual studio2022配置opencv
标题:在vs下配置使用opencv 流程: 1、下载安装opencv 2、添加环境变量 3、vs中配置属性 4、使用 5、可能遇到的报错和解决 1、 下载安装opencv 官网下载地址: https://opencv.org/releases/ 我这里是windows环境,所以选择点击w…...

什么是销售管理?销售管理的五大职能
销售管理听起来很简单,似乎只是负责销售并确保客户满意,但事实上,它远不止于此。 销售管理的实际职能包括监督销售团队的工作,制定计划和设定目标,通常还包括确保销售流程的效率以获得最佳业务结果。 什么是销售管理…...

[CVPR‘22] EG3D: Efficient Geometry-aware 3D Generative Adversarial Networks
paper: https://nvlabs.github.io/eg3d/media/eg3d.pdfproject: EG3D: Efficient Geometry-aware 3D GANscode: GitHub - NVlabs/eg3d总结: 本文提出一种hybrid explicit-implicit 3D representation: tri-plane hybrid 3D representation,该方法不仅有…...

Learning C++ No.9【STL No.1】
引言: 北京时间:2023/2/13/18:29,开学正式上课第一天,直接上午一节思想政治,下午一节思想政治,生怕我们……,但,我深知该课的无聊,所以充分利用时间,把我的小…...

Apifox推荐-django后台验证token配置
最近事情很多,但是我还是想写一片推荐apifox的文章。 优秀的UI,清晰地逻辑,丰富的功能。对于我们这种业余选手来说,他真的很便利。 更新新版后有了更多贴心的功能,让你感觉他是一个有温度的工具。 最重要的是…...

SAS应用入门学习笔记6
SQL (SAS): Features: 1)不需要在每个query中重复调用每个SQL; 2)每个statement都是独立去完成的; 3)我们是没有proc print和proc sort语句的;(order by) key synta…...

【3D目标检测】Pseudo-Stereo for Monocular 3D Object Detection in Autonomous Driving
目录概述细节背景与整体流程图像级别生成特征级别生成损失函数学习深度感知的特征概述 本文是基于单目图像的3D目标检测方法。 【2021】【MonoDLE】 研究的问题: 能否借助立体图像检测算法提高单目图像检测的效果如何实现右侧图像的生成 解决的方法: 受启发于伪…...
git 常用命令之 git branch
大家好,我是 17。 新建 git 分支 分支是并行开发的基础。分支名称的本质是对分支最后一个提交的引用。分支有多个,但 HEAD 只有一个,可以认为 HEAD 是"current branch"(当下的分支)。当你用git switch切换分支的时候,…...
Oracle数据泵
Oracle 数据泵:概览 作为一个基于服务器的用于高速移动数据与元数据的工具, Oracle 数据泵具有以下特点: •可通过 DBMS_DATAPUMP 调用 •可提供以下工具: – expdp – impdp – 基于 Web 的界面 •提供四种数据移动方法ÿ…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...

stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
Go 语言接口详解
Go 语言接口详解 核心概念 接口定义 在 Go 语言中,接口是一种抽象类型,它定义了一组方法的集合: // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的: // 矩形结构体…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
docker 部署发现spring.profiles.active 问题
报错: org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...
Java 二维码
Java 二维码 **技术:**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...

零知开源——STM32F103RBT6驱动 ICM20948 九轴传感器及 vofa + 上位机可视化教程
STM32F1 本教程使用零知标准板(STM32F103RBT6)通过I2C驱动ICM20948九轴传感器,实现姿态解算,并通过串口将数据实时发送至VOFA上位机进行3D可视化。代码基于开源库修改优化,适合嵌入式及物联网开发者。在基础驱动上新增…...
uniapp 实现腾讯云IM群文件上传下载功能
UniApp 集成腾讯云IM实现群文件上传下载功能全攻略 一、功能背景与技术选型 在团队协作场景中,群文件共享是核心需求之一。本文将介绍如何基于腾讯云IMCOS,在uniapp中实现: 群内文件上传/下载文件元数据管理下载进度追踪跨平台文件预览 二…...

【Veristand】Veristand环境安装教程-Linux RT / Windows
首先声明,此教程是针对Simulink编译模型并导入Veristand中编写的,同时需要注意的是老用户编译可能用的是Veristand Model Framework,那个是历史版本,且NI不会再维护,新版本编译支持为VeriStand Model Generation Suppo…...