C++实现线程池
C++实现线程池
- 一、前言
 - 二、线程池的接口设计
 - 2.1、类封装
 - 2.2、线程池的初始化
 - 2.3、线程池的启动
 - 2.4、线程池的停止
 - 2.5、线程的执行函数run()
 - 2.6、任务的运行函数
 - 2.7、等待所有线程结束
 
- 三、测试线程池
 - 四、源码地址
 - 总结
 
一、前言
C++实现的线程池,可能涉及以下知识点:
- decltype。
 - packaged_task。
 - make_shared。
 - mutex。
 - unique_lock。
 - notify_one。
 - future。
 - queue。
 - bind。
 - thread。
等等。

 
二、线程池的接口设计
(1)封装一个线程池的类。
 (2)线程池的初始化:设置线程的数量。
 (3)启动线程池:创建线程等工作。
 (4)执行任务的函数。
 (5)停止线程池。
 (6)等所有任务执行完成,退出执行函数。
2.1、类封装
线程池类,采用c++11来实现。
#ifndef _CPP_THREAD_POOL_H_
#define _CPP_THREAD_POOL_H_#include <iostream>
#include <functional>
#include <memory>
#include <queue>
#include <mutex>
#include <vector>
#include <thread>
#include <future>#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endifusing namespace std;void getNow(timeval *tv);
int64_t getNowMs();#define TNOW    getNow()
#define TNOWMS  getNowMs()class CPP_ThreadPool{
protected:struct TaskFunc{TaskFunc(uint64_t expireTime):_expireTime(expireTime){}int64_t _expireTime=0;//超时的绝对时间function<void()> _func;};typedef shared_ptr<TaskFunc> TaskFuncPtr;/* * @brief 获取任务 ** *@return TaskFuncPtr */bool get(TaskFuncPtr& task);/** @brief 线程池是否退出*/bool isTerminate(){return _bTerminate;}/** @brief 线程运行态*/void run();public: /** @brief 构造函数 */CPP_ThreadPool(); /* * @brief 析构, 会停止所有线程 */virtual ~CPP_ThreadPool();/* * * @brief 初始化. * * @param num 工作线程个数 */bool init(size_t num);/** @brief 停止所有线程, 会等待所有线程结束 */void stop();/** @brief 启动所有线程 */bool start();/* * @brief 等待当前任务队列中, 所有工作全部结束(队列无任务). * @param millsecond 等待的时间(ms), -1:永远等待 * @return true, 所有工作都处理完毕 * false,超时退出 */bool waitForAllDone(int millsecond=-1);/** @brief 获取线程个数.* @return size_t 线程个数 */size_t getThreadNum(){unique_lock<mutex> lock(_mutex);return _threads.size();}/**  @brief 获取当前线程池的任务数* @return size_t 线程池的任务数 */size_t getJobNum(){unique_lock<mutex> lock(_mutex);return _tasks.size();}/** @brief 用线程池启用任务(F是function, Args是参数) ** * @param ParentFunctor * @param tf * @return 返回任务的future对象, 可以通过这个对象来获取返回值 */template <class F,class... Args>auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>{return exec(0,f,args...);}/* * unused.** @brief 用线程池启用任务(F是function, Args是参数) * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃 * @param bind function * @return 返回任务的future对象, 可以通过这个对象来获取返回值 ** template <class F, class... Args> * 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数 * auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))> * std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值 * 返回值后置*/template<class F,class... Args>auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>{//获取现在时间int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;// 定义返回值类型using retType=decltype(f(args...));// 封装任务auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));// 封装任务指针,设置过期时间TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);fPtr->_func=[task](){(*task)();};unique_lock<mutex> lock(_mutex);// 插入任务_tasks.push(fPtr);// 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify_condition.notify_one();return task->get_future();}protected:size_t  _threadNum;//线程数量bool    _bTerminate;//判定是否终止线程池mutex   _mutex;     //唯一锁vector<thread*> _threads;   //工作线程数组queue<TaskFuncPtr> _tasks;  //任务队列condition_variable _condition;//条件变量atomic<int>         _atomic{0};//原子变量
};#endif
 
使用示例:
CPP_ThreadPool tpool; 
tpool.init(5); //初始化线程池线程数 
//启动线程方式 
tpool.start(); 
//将任务丢到线程池中* 
tpool.exec(testFunction, 10); //参数和start相同 
//等待线程池结束 
tpool.waitForAllDone(1000); //参数<0时, 表示无限等待(注意有人调用stop也会推出) 
//此时: 外部需要结束线程池是调用 
tpool.stop(); 
 
注意:ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如:
int testInt(int i) 
{ return i; 
} 
auto f = tpool.exec(testInt, 5); 
cout << f.get() << endl; //当testInt在线程池中执行后, f.get()会返回数值5 class Test 
{ 
public: int test(int i); 
}; 
Test t; 
auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10); 
//返回的future对象, 可以检查是否执行 
cout << f.get() << endl; 
 
2.2、线程池的初始化
主要是设置线程池中线程的数量,如果线程池已经存在则直接返回,防止重复初始化。
bool CPP_ThreadPool::init(size_t num)
{unique_lock<mutex> lock(_mutex);if(!_threads.empty())return false;_threadNum=num;return true;
}
 
2.3、线程池的启动
根据设置的线程数量,创建线程并保存在一个数组中。如果线程池已经存在则直接返回,防止重复启动。
bool CPP_ThreadPool::start()
{unique_lock<mutex> lock(_mutex);if(!_threads.empty())return false;for(size_t i=0;i<_threadNum;i++){_threads.push_back(new thread(&CPP_ThreadPool::run,this));}return true;
}
 
2.4、线程池的停止
设置线程退出条件,并通知所有线程。停止时,要等待所有线程都执行完任务,再销毁线程。
 需要注意锁的粒度。
void CPP_ThreadPool::stop()
{// 注意要有这个{},不然会死锁。{unique_lock<mutex> lock(_mutex);_bTerminate=true;_condition.notify_all();}size_t thdCount=_threads.size();for(size_t i=0;i<thdCount;i++){if(_threads[i]->joinable()){_threads[i]->join();}delete _threads[i];_threads[i]=NULL;}unique_lock<mutex> lock(_mutex);_threads.clear();
}
 
2.5、线程的执行函数run()
读取任务:判断任务是否存在,如果任务队列为空,则进入等待状态直到任务队列不为空或退出线程池(这里需要两次判断,因为可能存在虚假唤醒)。
 执行任务:调用匿名函数。
 检测所有任务都是否执行完毕:这里使用了原子变量来检测任务是否都执行完,原因在于任务队列为空不代表任务已经执行完(任务可能还在运行中、也可能是任务刚弹出还没运行),使用原子变量来计数就更严谨。
bool CPP_ThreadPool::get(TaskFuncPtr& task)
{unique_lock<mutex> lock(_mutex);if(_tasks.empty())//判断任务是否存在{_condition.wait(lock,[this]{return _bTerminate || !_tasks.empty();//唤醒条件});}if(_bTerminate)return false;if(!_tasks.empty())//判断任务是否存在{task=move(_tasks.front());// 使用移动语义_tasks.pop();//弹出一个任务return true;}return false;
}// 执行任务的线程
void CPP_ThreadPool::run()
{while(!isTerminate()){TaskFuncPtr task;// 读取任务bool ok=get(task);if(ok){++_atomic;try{if(task->_expireTime!=0 && task->_expireTime < TNOWMS){// 处理超时任务}elsetask->_func();//执行任务}catch(...){}--_atomic;// 任务执行完毕,这里只是为了通知waitForAllDoneunique_lock<mutex> lock(_mutex);if(_atomic==0 && _tasks.empty())_condition.notify_all();}}
}
 
2.6、任务的运行函数
这里使用了可变模块参数、智能指针、bind、function、捕获列表的相关技术知识。
 返回任务的future对象, 可以通过这个对象来获取返回值。
 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃。
 可变模块参数对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数。
  /** @brief 用线程池启用任务(F是function, Args是参数) ** * @param ParentFunctor * @param tf * @return 返回任务的future对象, 可以通过这个对象来获取返回值 */template <class F,class... Args>auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>{return exec(0,f,args...);}/* * unused.** @brief 用线程池启用任务(F是function, Args是参数) * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃 * @param bind function * @return 返回任务的future对象, 可以通过这个对象来获取返回值 ** template <class F, class... Args> * 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数 * auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))> * std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值 * 返回值后置*/template<class F,class... Args>auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>{//获取现在时间int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;// 定义返回值类型using retType=decltype(f(args...));// 封装任务auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));// 封装任务指针,设置过期时间TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);fPtr->_func=[task](){(*task)();};unique_lock<mutex> lock(_mutex);// 插入任务_tasks.push(fPtr);// 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify_condition.notify_one();return task->get_future();}
 
2.7、等待所有线程结束
bool CPP_ThreadPool::waitForAllDone(int millsecond)
{unique_lock<mutex> lock(_mutex);if(_tasks.empty())return true;if(millsecond<0){_condition.wait(lock,[this]{ return _tasks.empty();});return true;}else{return _condition.wait_for(lock,chrono::milliseconds(millsecond),[this]{ return _tasks.empty();});}
}
 
三、测试线程池
#include <iostream>
#include "cppThreadPool.h"using namespace std;void func1(int a) 
{ cout << "func1() a=" << a << endl; 
}
void func2(int a, string b) 
{ cout << "func2() a=" << a << ", b=" << b<< endl; 
}void func3()
{cout<<"func3"<<endl;
}void test01()
{cout<<"test 01"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池// 执行任务threadpool.exec(func1,10);threadpool.exec(func2,20,"FLY.");threadpool.exec(1000,func3);threadpool.waitForAllDone();threadpool.stop();
}int func1_future(int a) 
{ cout << "func1() a=" << a << endl; return a; 
}string func2_future(int a, string b) 
{ cout << "func2() a=" << a << ", b=" << b<< endl; return b; 
}void test02()
{cout<<"test 02"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池future<decltype(func1_future(0))> ret01=threadpool.exec(func1_future,10);future<string> ret02=threadpool.exec(func2_future,20,"FLY.");threadpool.waitForAllDone();cout<<"ret01 = "<<ret01.get()<<endl;cout<<"ret02 = "<<ret02.get()<<endl;threadpool.stop();}class Test{
public:int test(int a){cout<<_name<<": a = "<<a<<endl;return a+1;}void setname(string name){_name=name;}string _name;};void test03()
{cout<<"test 03"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池Test t1;Test t2;t1.setname("Test 1");t2.setname("Test 2");auto f1=threadpool.exec(bind(&Test::test,&t1,placeholders::_1),10);auto f2=threadpool.exec(bind(&Test::test,&t2,placeholders::_1),20);threadpool.waitForAllDone();cout<<"f1 = "<<f1.get()<<endl;cout<<"f2 = "<<f2.get()<<endl;threadpool.stop();}int main(int argc,char **argv)
{// 简单测试线程池test01();// 测试任务函数返回值test02();// 测试类对象函数的绑定test03();return 0;
}
 
执行结果:
test 01
func1() a=10
func2() a=20, b=FLY.
func3
test 02
func1() a=10
func2() a=20, b=FLY.
ret01 = 10
ret02 = FLY.
test 03
Test 1: a = 10
Test 2: a = 20
f1 = 11
f2 = 21
 
四、源码地址
源码已经上传github。
总结
线程池的核心:初始化、线程启动、执行函数、线程停止。

相关文章:
C++实现线程池
C实现线程池一、前言二、线程池的接口设计2.1、类封装2.2、线程池的初始化2.3、线程池的启动2.4、线程池的停止2.5、线程的执行函数run()2.6、任务的运行函数2.7、等待所有线程结束三、测试线程池四、源码地址总结一、前言 C实现的线程池,可能涉及以下知识点&#…...
2023最新Java面试手册(性能优化+微服务架构+并发编程+开源框架)
Java面试手册 一、性能优化面试专栏 1.1、 tomcat性能优化整理 1.2、JVM性能优化整理 1.3、Mysql性能优化整理 二、微服务架构面试专栏 2.1、SpringCloud面试整理 2.2、SpringBoot面试整理 2.3、Dubbo面试整理 三、并发编程高级面试专栏 四、开源框架面试题专栏 4.1、Sprin…...
对灵敏度分析技术进行建模(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
完整爬虫学习笔记(第一章)
文章目录前言:fu:. 爬虫概述:hotdog:原理解剖:one: 服务器渲染:two: 前端JS渲染:fire: 第一个爬虫程序案例总结前言 最近正在学习Python网络爬虫的相关知识,鉴于本人Python水平有限 , 对Python并无太深的理解,所以此文章的主要目的在于抛砖引玉…...
会计师项目管理软件是什么,哪些必不可少的功能
欢迎阅读现代金融专业人士的会计师项目管理指南。在本文中,我们将深入探讨在基于项目的会计的各个方面使用项目管理方法的好处。我们还将教您面临哪些挑战以及如何为您的团队选择最佳工具。 为什么会计师的项目管理很重要? 在会计方面,目标始…...
第 8 章 优化
目录 8.1 优化概述 8.2 优化 SQL 语句 8.3 优化和指标 8.4 优化数据库结构 8.5 优化 InnoDB 表 8.6 优化 MyISAM 表 8.7 内存表的优化 8.8 了解查询执行计划 8.9 控制查询优化器 8.10 缓冲和缓存 8.11 优化锁定操作 8.12 优化 MySQL 服务器 8.13 衡量性能ÿ…...
剑指offer -- java题解
剑指offer -- java题解刷题地址1、数字在升序数组中出现的次数2、二叉搜索树的第k个节点3、二叉树的深度4、数组中只出现一次的两个数字5、和为S的两个数字6、左旋转字符串7、滑动窗口的最大值8、扑克牌顺子9、孩子们的游戏(圆圈中最后剩下的数)10、买卖股票的最好时机(一)刷题…...
若依ruoyi——手把手教你制作自己的管理系统【二、修改样式】
阿里图标一( ̄︶ ̄*)) 图片白嫖一((* ̄3 ̄)╭ ********* 专栏略长 爆肝万字 细节狂魔 请准备好一键三连 ********* 运行成功后: idea后台正常先挂着 我习惯用VScode操作 当然如果有两台机子 一个挂后台一个改前端就更好…...
2023.2.14每日一题——455. 分发饼干
每日一题题目描述解题核心解法一:双指针题目描述 题目链接:455. 分发饼干 假设你是一位很棒的家长,想要给你的孩子们一些小饼干。但是,每个孩子最多只能给一块饼干。 对每个孩子 i,都有一个胃口值 g[i],…...
MySQL入门篇-MySQL常用字符函数小结
备注:测试数据库版本为MySQL 8.0 这个blog我们来聊聊常见的字符函数 函数名函数用途UPPER()返回大写的字符LOWER()返回小写的字符LTRIM()左边去掉空格TRIM()去掉空格RTRIM()右边去掉空格SPACE()返回指定长度的空格CONCAT()连接字符串CONCAT_WS()指定分隔符连接字符串CHAR_LEN…...
解决不同影像裁剪后栅格数据行列不一致问题
前言在处理栅格数据时,尽管用同一个矢量文件裁剪栅格数据,不同数据来源的栅格行列数也会出现不一致的情况。如果忽略或解决不好,会导致后续数据处理出现意想不到的误差或错误,尤其是利用编程实现数据处理时。因此,应当…...
visual studio2022配置opencv
标题:在vs下配置使用opencv 流程: 1、下载安装opencv 2、添加环境变量 3、vs中配置属性 4、使用 5、可能遇到的报错和解决 1、 下载安装opencv 官网下载地址: https://opencv.org/releases/ 我这里是windows环境,所以选择点击w…...
什么是销售管理?销售管理的五大职能
销售管理听起来很简单,似乎只是负责销售并确保客户满意,但事实上,它远不止于此。 销售管理的实际职能包括监督销售团队的工作,制定计划和设定目标,通常还包括确保销售流程的效率以获得最佳业务结果。 什么是销售管理…...
[CVPR‘22] EG3D: Efficient Geometry-aware 3D Generative Adversarial Networks
paper: https://nvlabs.github.io/eg3d/media/eg3d.pdfproject: EG3D: Efficient Geometry-aware 3D GANscode: GitHub - NVlabs/eg3d总结: 本文提出一种hybrid explicit-implicit 3D representation: tri-plane hybrid 3D representation,该方法不仅有…...
Learning C++ No.9【STL No.1】
引言: 北京时间:2023/2/13/18:29,开学正式上课第一天,直接上午一节思想政治,下午一节思想政治,生怕我们……,但,我深知该课的无聊,所以充分利用时间,把我的小…...
Apifox推荐-django后台验证token配置
最近事情很多,但是我还是想写一片推荐apifox的文章。 优秀的UI,清晰地逻辑,丰富的功能。对于我们这种业余选手来说,他真的很便利。 更新新版后有了更多贴心的功能,让你感觉他是一个有温度的工具。 最重要的是…...
SAS应用入门学习笔记6
SQL (SAS): Features: 1)不需要在每个query中重复调用每个SQL; 2)每个statement都是独立去完成的; 3)我们是没有proc print和proc sort语句的;(order by) key synta…...
【3D目标检测】Pseudo-Stereo for Monocular 3D Object Detection in Autonomous Driving
目录概述细节背景与整体流程图像级别生成特征级别生成损失函数学习深度感知的特征概述 本文是基于单目图像的3D目标检测方法。 【2021】【MonoDLE】 研究的问题: 能否借助立体图像检测算法提高单目图像检测的效果如何实现右侧图像的生成 解决的方法: 受启发于伪…...
git 常用命令之 git branch
大家好,我是 17。 新建 git 分支 分支是并行开发的基础。分支名称的本质是对分支最后一个提交的引用。分支有多个,但 HEAD 只有一个,可以认为 HEAD 是"current branch"(当下的分支)。当你用git switch切换分支的时候,…...
Oracle数据泵
Oracle 数据泵:概览 作为一个基于服务器的用于高速移动数据与元数据的工具, Oracle 数据泵具有以下特点: •可通过 DBMS_DATAPUMP 调用 •可提供以下工具: – expdp – impdp – 基于 Web 的界面 •提供四种数据移动方法ÿ…...
网络六边形受到攻击
大家读完觉得有帮助记得关注和点赞!!! 抽象 现代智能交通系统 (ITS) 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 (…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...
循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...
Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...
【Go】3、Go语言进阶与依赖管理
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes࿰…...
深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南
🚀 C extern 关键字深度解析:跨文件编程的终极指南 📅 更新时间:2025年6月5日 🏷️ 标签:C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言🔥一、extern 是什么?&…...
06 Deep learning神经网络编程基础 激活函数 --吴恩达
深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...
AI书签管理工具开发全记录(十九):嵌入资源处理
1.前言 📝 在上一篇文章中,我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源,方便后续将资源打包到一个可执行文件中。 2.embed介绍 🎯 Go 1.16 引入了革命性的 embed 包,彻底改变了静态资源管理的…...
