C++实现线程池
C++实现线程池
- 一、前言
- 二、线程池的接口设计
- 2.1、类封装
- 2.2、线程池的初始化
- 2.3、线程池的启动
- 2.4、线程池的停止
- 2.5、线程的执行函数run()
- 2.6、任务的运行函数
- 2.7、等待所有线程结束
- 三、测试线程池
- 四、源码地址
- 总结
一、前言
C++实现的线程池,可能涉及以下知识点:
- decltype。
- packaged_task。
- make_shared。
- mutex。
- unique_lock。
- notify_one。
- future。
- queue。
- bind。
- thread。
等等。

二、线程池的接口设计
(1)封装一个线程池的类。
(2)线程池的初始化:设置线程的数量。
(3)启动线程池:创建线程等工作。
(4)执行任务的函数。
(5)停止线程池。
(6)等所有任务执行完成,退出执行函数。
2.1、类封装
线程池类,采用c++11来实现。
#ifndef _CPP_THREAD_POOL_H_
#define _CPP_THREAD_POOL_H_#include <iostream>
#include <functional>
#include <memory>
#include <queue>
#include <mutex>
#include <vector>
#include <thread>
#include <future>#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endifusing namespace std;void getNow(timeval *tv);
int64_t getNowMs();#define TNOW getNow()
#define TNOWMS getNowMs()class CPP_ThreadPool{
protected:struct TaskFunc{TaskFunc(uint64_t expireTime):_expireTime(expireTime){}int64_t _expireTime=0;//超时的绝对时间function<void()> _func;};typedef shared_ptr<TaskFunc> TaskFuncPtr;/* * @brief 获取任务 ** *@return TaskFuncPtr */bool get(TaskFuncPtr& task);/** @brief 线程池是否退出*/bool isTerminate(){return _bTerminate;}/** @brief 线程运行态*/void run();public: /** @brief 构造函数 */CPP_ThreadPool(); /* * @brief 析构, 会停止所有线程 */virtual ~CPP_ThreadPool();/* * * @brief 初始化. * * @param num 工作线程个数 */bool init(size_t num);/** @brief 停止所有线程, 会等待所有线程结束 */void stop();/** @brief 启动所有线程 */bool start();/* * @brief 等待当前任务队列中, 所有工作全部结束(队列无任务). * @param millsecond 等待的时间(ms), -1:永远等待 * @return true, 所有工作都处理完毕 * false,超时退出 */bool waitForAllDone(int millsecond=-1);/** @brief 获取线程个数.* @return size_t 线程个数 */size_t getThreadNum(){unique_lock<mutex> lock(_mutex);return _threads.size();}/** @brief 获取当前线程池的任务数* @return size_t 线程池的任务数 */size_t getJobNum(){unique_lock<mutex> lock(_mutex);return _tasks.size();}/** @brief 用线程池启用任务(F是function, Args是参数) ** * @param ParentFunctor * @param tf * @return 返回任务的future对象, 可以通过这个对象来获取返回值 */template <class F,class... Args>auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>{return exec(0,f,args...);}/* * unused.** @brief 用线程池启用任务(F是function, Args是参数) * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃 * @param bind function * @return 返回任务的future对象, 可以通过这个对象来获取返回值 ** template <class F, class... Args> * 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数 * auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))> * std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值 * 返回值后置*/template<class F,class... Args>auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>{//获取现在时间int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;// 定义返回值类型using retType=decltype(f(args...));// 封装任务auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));// 封装任务指针,设置过期时间TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);fPtr->_func=[task](){(*task)();};unique_lock<mutex> lock(_mutex);// 插入任务_tasks.push(fPtr);// 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify_condition.notify_one();return task->get_future();}protected:size_t _threadNum;//线程数量bool _bTerminate;//判定是否终止线程池mutex _mutex; //唯一锁vector<thread*> _threads; //工作线程数组queue<TaskFuncPtr> _tasks; //任务队列condition_variable _condition;//条件变量atomic<int> _atomic{0};//原子变量
};#endif
使用示例:
CPP_ThreadPool tpool;
tpool.init(5); //初始化线程池线程数
//启动线程方式
tpool.start();
//将任务丢到线程池中*
tpool.exec(testFunction, 10); //参数和start相同
//等待线程池结束
tpool.waitForAllDone(1000); //参数<0时, 表示无限等待(注意有人调用stop也会推出)
//此时: 外部需要结束线程池是调用
tpool.stop();
注意:ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如:
int testInt(int i)
{ return i;
}
auto f = tpool.exec(testInt, 5);
cout << f.get() << endl; //当testInt在线程池中执行后, f.get()会返回数值5 class Test
{
public: int test(int i);
};
Test t;
auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10);
//返回的future对象, 可以检查是否执行
cout << f.get() << endl;
2.2、线程池的初始化
主要是设置线程池中线程的数量,如果线程池已经存在则直接返回,防止重复初始化。
bool CPP_ThreadPool::init(size_t num)
{unique_lock<mutex> lock(_mutex);if(!_threads.empty())return false;_threadNum=num;return true;
}
2.3、线程池的启动
根据设置的线程数量,创建线程并保存在一个数组中。如果线程池已经存在则直接返回,防止重复启动。
bool CPP_ThreadPool::start()
{unique_lock<mutex> lock(_mutex);if(!_threads.empty())return false;for(size_t i=0;i<_threadNum;i++){_threads.push_back(new thread(&CPP_ThreadPool::run,this));}return true;
}
2.4、线程池的停止
设置线程退出条件,并通知所有线程。停止时,要等待所有线程都执行完任务,再销毁线程。
需要注意锁的粒度。
void CPP_ThreadPool::stop()
{// 注意要有这个{},不然会死锁。{unique_lock<mutex> lock(_mutex);_bTerminate=true;_condition.notify_all();}size_t thdCount=_threads.size();for(size_t i=0;i<thdCount;i++){if(_threads[i]->joinable()){_threads[i]->join();}delete _threads[i];_threads[i]=NULL;}unique_lock<mutex> lock(_mutex);_threads.clear();
}
2.5、线程的执行函数run()
读取任务:判断任务是否存在,如果任务队列为空,则进入等待状态直到任务队列不为空或退出线程池(这里需要两次判断,因为可能存在虚假唤醒)。
执行任务:调用匿名函数。
检测所有任务都是否执行完毕:这里使用了原子变量来检测任务是否都执行完,原因在于任务队列为空不代表任务已经执行完(任务可能还在运行中、也可能是任务刚弹出还没运行),使用原子变量来计数就更严谨。
bool CPP_ThreadPool::get(TaskFuncPtr& task)
{unique_lock<mutex> lock(_mutex);if(_tasks.empty())//判断任务是否存在{_condition.wait(lock,[this]{return _bTerminate || !_tasks.empty();//唤醒条件});}if(_bTerminate)return false;if(!_tasks.empty())//判断任务是否存在{task=move(_tasks.front());// 使用移动语义_tasks.pop();//弹出一个任务return true;}return false;
}// 执行任务的线程
void CPP_ThreadPool::run()
{while(!isTerminate()){TaskFuncPtr task;// 读取任务bool ok=get(task);if(ok){++_atomic;try{if(task->_expireTime!=0 && task->_expireTime < TNOWMS){// 处理超时任务}elsetask->_func();//执行任务}catch(...){}--_atomic;// 任务执行完毕,这里只是为了通知waitForAllDoneunique_lock<mutex> lock(_mutex);if(_atomic==0 && _tasks.empty())_condition.notify_all();}}
}
2.6、任务的运行函数
这里使用了可变模块参数、智能指针、bind、function、捕获列表的相关技术知识。
返回任务的future对象, 可以通过这个对象来获取返回值。
超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃。
可变模块参数对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数。
/** @brief 用线程池启用任务(F是function, Args是参数) ** * @param ParentFunctor * @param tf * @return 返回任务的future对象, 可以通过这个对象来获取返回值 */template <class F,class... Args>auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>{return exec(0,f,args...);}/* * unused.** @brief 用线程池启用任务(F是function, Args是参数) * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃 * @param bind function * @return 返回任务的future对象, 可以通过这个对象来获取返回值 ** template <class F, class... Args> * 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数 * auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))> * std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值 * 返回值后置*/template<class F,class... Args>auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>{//获取现在时间int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;// 定义返回值类型using retType=decltype(f(args...));// 封装任务auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));// 封装任务指针,设置过期时间TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);fPtr->_func=[task](){(*task)();};unique_lock<mutex> lock(_mutex);// 插入任务_tasks.push(fPtr);// 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify_condition.notify_one();return task->get_future();}
2.7、等待所有线程结束
bool CPP_ThreadPool::waitForAllDone(int millsecond)
{unique_lock<mutex> lock(_mutex);if(_tasks.empty())return true;if(millsecond<0){_condition.wait(lock,[this]{ return _tasks.empty();});return true;}else{return _condition.wait_for(lock,chrono::milliseconds(millsecond),[this]{ return _tasks.empty();});}
}
三、测试线程池
#include <iostream>
#include "cppThreadPool.h"using namespace std;void func1(int a)
{ cout << "func1() a=" << a << endl;
}
void func2(int a, string b)
{ cout << "func2() a=" << a << ", b=" << b<< endl;
}void func3()
{cout<<"func3"<<endl;
}void test01()
{cout<<"test 01"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池// 执行任务threadpool.exec(func1,10);threadpool.exec(func2,20,"FLY.");threadpool.exec(1000,func3);threadpool.waitForAllDone();threadpool.stop();
}int func1_future(int a)
{ cout << "func1() a=" << a << endl; return a;
}string func2_future(int a, string b)
{ cout << "func2() a=" << a << ", b=" << b<< endl; return b;
}void test02()
{cout<<"test 02"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池future<decltype(func1_future(0))> ret01=threadpool.exec(func1_future,10);future<string> ret02=threadpool.exec(func2_future,20,"FLY.");threadpool.waitForAllDone();cout<<"ret01 = "<<ret01.get()<<endl;cout<<"ret02 = "<<ret02.get()<<endl;threadpool.stop();}class Test{
public:int test(int a){cout<<_name<<": a = "<<a<<endl;return a+1;}void setname(string name){_name=name;}string _name;};void test03()
{cout<<"test 03"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池Test t1;Test t2;t1.setname("Test 1");t2.setname("Test 2");auto f1=threadpool.exec(bind(&Test::test,&t1,placeholders::_1),10);auto f2=threadpool.exec(bind(&Test::test,&t2,placeholders::_1),20);threadpool.waitForAllDone();cout<<"f1 = "<<f1.get()<<endl;cout<<"f2 = "<<f2.get()<<endl;threadpool.stop();}int main(int argc,char **argv)
{// 简单测试线程池test01();// 测试任务函数返回值test02();// 测试类对象函数的绑定test03();return 0;
}
执行结果:
test 01
func1() a=10
func2() a=20, b=FLY.
func3
test 02
func1() a=10
func2() a=20, b=FLY.
ret01 = 10
ret02 = FLY.
test 03
Test 1: a = 10
Test 2: a = 20
f1 = 11
f2 = 21
四、源码地址
源码已经上传github。
总结
线程池的核心:初始化、线程启动、执行函数、线程停止。

相关文章:
C++实现线程池
C实现线程池一、前言二、线程池的接口设计2.1、类封装2.2、线程池的初始化2.3、线程池的启动2.4、线程池的停止2.5、线程的执行函数run()2.6、任务的运行函数2.7、等待所有线程结束三、测试线程池四、源码地址总结一、前言 C实现的线程池,可能涉及以下知识点&#…...
2023最新Java面试手册(性能优化+微服务架构+并发编程+开源框架)
Java面试手册 一、性能优化面试专栏 1.1、 tomcat性能优化整理 1.2、JVM性能优化整理 1.3、Mysql性能优化整理 二、微服务架构面试专栏 2.1、SpringCloud面试整理 2.2、SpringBoot面试整理 2.3、Dubbo面试整理 三、并发编程高级面试专栏 四、开源框架面试题专栏 4.1、Sprin…...
对灵敏度分析技术进行建模(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
完整爬虫学习笔记(第一章)
文章目录前言:fu:. 爬虫概述:hotdog:原理解剖:one: 服务器渲染:two: 前端JS渲染:fire: 第一个爬虫程序案例总结前言 最近正在学习Python网络爬虫的相关知识,鉴于本人Python水平有限 , 对Python并无太深的理解,所以此文章的主要目的在于抛砖引玉…...
会计师项目管理软件是什么,哪些必不可少的功能
欢迎阅读现代金融专业人士的会计师项目管理指南。在本文中,我们将深入探讨在基于项目的会计的各个方面使用项目管理方法的好处。我们还将教您面临哪些挑战以及如何为您的团队选择最佳工具。 为什么会计师的项目管理很重要? 在会计方面,目标始…...
第 8 章 优化
目录 8.1 优化概述 8.2 优化 SQL 语句 8.3 优化和指标 8.4 优化数据库结构 8.5 优化 InnoDB 表 8.6 优化 MyISAM 表 8.7 内存表的优化 8.8 了解查询执行计划 8.9 控制查询优化器 8.10 缓冲和缓存 8.11 优化锁定操作 8.12 优化 MySQL 服务器 8.13 衡量性能ÿ…...
剑指offer -- java题解
剑指offer -- java题解刷题地址1、数字在升序数组中出现的次数2、二叉搜索树的第k个节点3、二叉树的深度4、数组中只出现一次的两个数字5、和为S的两个数字6、左旋转字符串7、滑动窗口的最大值8、扑克牌顺子9、孩子们的游戏(圆圈中最后剩下的数)10、买卖股票的最好时机(一)刷题…...
若依ruoyi——手把手教你制作自己的管理系统【二、修改样式】
阿里图标一( ̄︶ ̄*)) 图片白嫖一((* ̄3 ̄)╭ ********* 专栏略长 爆肝万字 细节狂魔 请准备好一键三连 ********* 运行成功后: idea后台正常先挂着 我习惯用VScode操作 当然如果有两台机子 一个挂后台一个改前端就更好…...
2023.2.14每日一题——455. 分发饼干
每日一题题目描述解题核心解法一:双指针题目描述 题目链接:455. 分发饼干 假设你是一位很棒的家长,想要给你的孩子们一些小饼干。但是,每个孩子最多只能给一块饼干。 对每个孩子 i,都有一个胃口值 g[i],…...
MySQL入门篇-MySQL常用字符函数小结
备注:测试数据库版本为MySQL 8.0 这个blog我们来聊聊常见的字符函数 函数名函数用途UPPER()返回大写的字符LOWER()返回小写的字符LTRIM()左边去掉空格TRIM()去掉空格RTRIM()右边去掉空格SPACE()返回指定长度的空格CONCAT()连接字符串CONCAT_WS()指定分隔符连接字符串CHAR_LEN…...
解决不同影像裁剪后栅格数据行列不一致问题
前言在处理栅格数据时,尽管用同一个矢量文件裁剪栅格数据,不同数据来源的栅格行列数也会出现不一致的情况。如果忽略或解决不好,会导致后续数据处理出现意想不到的误差或错误,尤其是利用编程实现数据处理时。因此,应当…...
visual studio2022配置opencv
标题:在vs下配置使用opencv 流程: 1、下载安装opencv 2、添加环境变量 3、vs中配置属性 4、使用 5、可能遇到的报错和解决 1、 下载安装opencv 官网下载地址: https://opencv.org/releases/ 我这里是windows环境,所以选择点击w…...
什么是销售管理?销售管理的五大职能
销售管理听起来很简单,似乎只是负责销售并确保客户满意,但事实上,它远不止于此。 销售管理的实际职能包括监督销售团队的工作,制定计划和设定目标,通常还包括确保销售流程的效率以获得最佳业务结果。 什么是销售管理…...
[CVPR‘22] EG3D: Efficient Geometry-aware 3D Generative Adversarial Networks
paper: https://nvlabs.github.io/eg3d/media/eg3d.pdfproject: EG3D: Efficient Geometry-aware 3D GANscode: GitHub - NVlabs/eg3d总结: 本文提出一种hybrid explicit-implicit 3D representation: tri-plane hybrid 3D representation,该方法不仅有…...
Learning C++ No.9【STL No.1】
引言: 北京时间:2023/2/13/18:29,开学正式上课第一天,直接上午一节思想政治,下午一节思想政治,生怕我们……,但,我深知该课的无聊,所以充分利用时间,把我的小…...
Apifox推荐-django后台验证token配置
最近事情很多,但是我还是想写一片推荐apifox的文章。 优秀的UI,清晰地逻辑,丰富的功能。对于我们这种业余选手来说,他真的很便利。 更新新版后有了更多贴心的功能,让你感觉他是一个有温度的工具。 最重要的是…...
SAS应用入门学习笔记6
SQL (SAS): Features: 1)不需要在每个query中重复调用每个SQL; 2)每个statement都是独立去完成的; 3)我们是没有proc print和proc sort语句的;(order by) key synta…...
【3D目标检测】Pseudo-Stereo for Monocular 3D Object Detection in Autonomous Driving
目录概述细节背景与整体流程图像级别生成特征级别生成损失函数学习深度感知的特征概述 本文是基于单目图像的3D目标检测方法。 【2021】【MonoDLE】 研究的问题: 能否借助立体图像检测算法提高单目图像检测的效果如何实现右侧图像的生成 解决的方法: 受启发于伪…...
git 常用命令之 git branch
大家好,我是 17。 新建 git 分支 分支是并行开发的基础。分支名称的本质是对分支最后一个提交的引用。分支有多个,但 HEAD 只有一个,可以认为 HEAD 是"current branch"(当下的分支)。当你用git switch切换分支的时候,…...
Oracle数据泵
Oracle 数据泵:概览 作为一个基于服务器的用于高速移动数据与元数据的工具, Oracle 数据泵具有以下特点: •可通过 DBMS_DATAPUMP 调用 •可提供以下工具: – expdp – impdp – 基于 Web 的界面 •提供四种数据移动方法ÿ…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...
Ascend NPU上适配Step-Audio模型
1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统,支持多语言对话(如 中文,英文,日语),语音情感(如 开心,悲伤)&#x…...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具
第2章 虚拟机性能监控,故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令:jps [options] [hostid] 功能:本地虚拟机进程显示进程ID(与ps相同),可同时显示主类&#x…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...
Reasoning over Uncertain Text by Generative Large Language Models
https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...
【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
Golang——7、包与接口详解
包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...
