【Linux】详解线程第三篇——线程同步和生产消费者模型
线程同步和生消模型
- 前言
- 正式开始
- 再次用黄牛抢票来讲解线程同步的思想
- 通过条件变量来实现线程同步
- 条件变量接口介绍
- 初始化和销毁
- pthread_cond_wait
- signal和broadcast
- 生产消费者模型
- 三种关系
- 用基本工程师思维再次理解
- 基于生产消费者模型的阻塞队列
- 版本一
- 版本二
- 多生多消
- 利用RAII来对锁进行优化
前言
本篇线程同步的内容是完全基于线程互斥来讲的,如果屏幕前的你对于线程互斥还不是很了解的话,可以看看我上一篇博客:【Linux】详解线程第二篇——用黄牛抢陈奕迅演唱会门票的例子来讲解【 线程互斥与锁 】
正式开始
上篇线程互斥中重点讲了互斥锁,虽然解决了多线程并发导致的临界资源不安全的问题,但是还存在一个比较重要的问题:访问临界资源合理性的问题。
再次用黄牛抢票来讲解线程同步的思想
再举一下我上篇博客中黄牛抢票的例子。
上一篇博客的例子中,只有黄牛和票这两个元素,对应的就是线程和临界资源,既然互斥锁已经讲了,那么就能多一个锁这个元素了,也就可以理解为多了一个售票员。所以这里一共三个主要元素:黄牛(线程)、票(临界资源)、售票员(锁)。
前面博客中我未加usleep对黄牛买票进行限制的例子中,出现了一个黄牛将所有票抢完的例子,也就是说整个流程中,只有一个线程对临界资源进行了访问,其他的线程虽然想要访问临界资源但是都没有访问到,这种情况不能说有错误,只能说设计的不合理,会造成其他线程的饥饿问题,一个人把所有的活全包了,其他人挣不到钱,就没饭吃了,虽然这个例子有点极端。但是确实是一个问题。
还有一个问题,假如此刻票被抢完了,但是票卖完后隔一段时间还会再次补票,但是无法确定票补充的时间,会在随机时刻进行补发(临界资源未准备就绪,但有可能随意时刻准备好)。这样的话,所有的黄牛都想抢票,但是都不知道什么时候会补票,于是所有的黄牛无时无刻都在问售票员票是否补发,这样的话,就会很浪费所有黄牛和售票员的时间,也即浪费所有线程向锁申请资源的时间,导致运行效率下降。虽然没做错,但是不合理。
上面这两个问题都是访问临界资源的合理性的问题。而引入线程同步就是为了解决这个问题。
想一想我们现实生活中是怎样售票的,当票卖完后,我们不需要一直询问售票员是否有票,只要等待通知即可,比如说12306,补到票了会通知你,当你进行补票的时候,是会排队的,也就是为什么你点击补票会显示当前人数是多还是中等还是少的信息。如果你补的比较早,那么你排队就会靠前一点,同理,补得比较晚,就会排的靠后一点。这里最重要的一点就是排队,使得整个流程有了一定的顺序。也就是让所有的线程按照一定的顺序去访问临界资源。这里排队可以解决一部分问题,也就是不断询问是否有票的问题。
那么还有一个黄牛将所有的票抢完的问题。再拿12306来说,我们每个人只能买一张票,如果想买多张,就要多给一个乘员信息,比如说你给你自己买了票,还想给你朋友买票的话,得要你朋友的身份证号码等信息。也就票和人是一对一的。那这里搞得极端一点,规定每次让黄牛只能买一张票,买完票后禁止继续买票,再加上上面的队列,如果卖完票的黄牛还想再买票,就必须去队尾重新排队购买。
上面其实已经把线程同步的思想讲解出来了,不过都是以黄牛的角度来说的,这里改口成线程来总结一下:
- 所有的线程想要访问临界资源时,都必须排队。
- 访问完临界资源的线程,若想要继续对临界资源进行访问,就必须跑到队尾等待其前面的线程访问完才能轮到当前线程。
这样让访问临界资源的线程,按照一定的顺序进行临界资源的访问,就是线程同步最重要的思想。
那么如何实现线程同步呢?
通过条件变量来实现线程同步
我们在申请临界资源前,先要做临界资源是否存在的检测,而检测也是需要访问临界资源的,那么对临界资源的检测也是一定需要在加锁和解锁之间的。
我把前面博客中的的例子改一改,加上补票机制:
#include <iostream>
using namespace std;#include <pthread.h>
#include <unistd.h>
#include <ctime>#include <string>// 线程数据,包含线程名字,也就是黄牛名字,还有线程对应的互斥锁
class ThreadData
{
public:ThreadData(const string& name, pthread_mutex_t* pmtx):_name(name),_pmtx(pmtx){}public:string _name;pthread_mutex_t* _pmtx;
};// 黄牛人数
#define THREAD_NUM 5// 陈奕迅演唱会的票数tickets
int tickets = 1000;// 黄牛抢票执行的方法
void* getTicket(void* arg)
{ThreadData* ptd = (ThreadData*)arg;// 每个黄牛疯狂抢ticketswhile(1){pthread_mutex_lock(ptd->_pmtx);if(tickets > 0){usleep(rand() % 2000);tickets--;printf("%s抢到票了, 此时ticket num ::%d\n", ptd->_name.c_str(), tickets);// 当对临界资源访问完后就解锁pthread_mutex_unlock(ptd->_pmtx);}else{// 当对临界资源访问完后就解锁,这里是当tickets == 0的情况,也要解锁pthread_mutex_unlock(ptd->_pmtx);printf("%s没抢到票, 票抢空了\n", ptd->_name.c_str());// 这里不再让黄牛抢不到票就退出,而是继续检查是否有票}// 抢到或没抢到票都执行一下后续动作,这里直接用usleep替代usleep(rand() % 5000);}// 记得要释放掉线程数据,不然内存泄漏delete ptd;return nullptr;
}int main()
{// 局部锁pthread_mutex_t mtx;// 默认给空就行pthread_mutex_init(&mtx, nullptr);// 种一颗随机数种子srand((unsigned int)time(nullptr) ^ getpid() ^ 0x24233342);// 假设此时有三个黄牛进行抢票pthread_t tid[THREAD_NUM];for(int i = 0; i < THREAD_NUM; ++i){string tmp;tmp += to_string(i + 1) + "号黄牛";ThreadData* ptd = new ThreadData(tmp, &mtx);// 每个黄牛去抢票pthread_create(tid + i, nullptr, getTicket, (void*)ptd);}// 补票机制while(1){if(tickets == 0){cout << "票抢光了,准备补票中..." << endl;sleep(rand() % 10);tickets = 500;break;}sleep(1);}// 等待每个黄牛抢完票后退出for(int i = 0; i < THREAD_NUM; ++i){pthread_join(tid[i], nullptr);}pthread_mutex_destroy(&mtx);return 0;
}
运行:
我截了三张图,一张是第一次票抢光之后准备补票:
第二张是票补上了之后:
第三张是补票抢完之后:
这里可以发现,当票抢光之后,5个线程一直在查询当前是否有票,正如开始所说的那样。
这里的代码,会在 if(tickets > 0) 前进行加锁,因为tickets > 0就访问了临界资源,没票之后会补票,但是线程不会退出,而是一直在查询是否补票,这样效率就很低,一直循环着申请锁和解锁,虽然没错,但是不合理。故这种常规方式检测条件也就注定了其必须频繁的申请加锁和解锁。
但是我们可以改一改:
- 不要让线程再频繁的去自己检测临界资源是否准备就绪,如果未准备就绪就让当前线程等待,也就是进入S状态。
- 当临界资源就绪的时候,再唤醒想要访问临界资源的线程。
这样效率就会大大提高。
想要实现这个功能的话,可以通过条件变量来实现。
条件变量接口介绍
条件,condition,pthread库中用起来条件变量的接口都是中间加上cond。
初始化和销毁
定义一个条件变量和定义一个锁一样,也分全局和局部,初始化也是全局可以直接用宏,局部要用init:
全局初始化的宏:
局部初始化的时候第一个参数就是条件变量的地址,第二个参数给空让它以默认方式初始化就行。
还是,以pthread开头的接口返回值绝大部分都是正确返回0,错误返回错误码。
destroy,就是销毁条件变量,没啥好讲的。
pthread_cond_wait
就是传一个条件变量指针,然后一个锁,为啥要传一个锁后面再说。只要调用了这个函数的线程就会进入阻塞状态,也就是从运行队列进入了等待队列中。这里就相当于是黄牛开始排队了,而且注意,这里的队伍是在等待cond这个条件准备就绪,也就是说,这里的队伍是专门为cond开辟的一个队,不同于普通的等待队列。
这里就相当于是前面的if判断资源是否存在了,但是是直接让线程进入等待队列中。
还有一个timewait,多了一个参数,前两个参数和wait一样,第三个参数是一个时间,让线程等待时,当第三个参数一到会自动醒来。
signal和broadcast
上面的pthread_cond_wait是让线程进入到与cond相关的等待队列中,当signal被调用时,就会有一个线程出队,就相当于是等待资源准备就绪了,此时就会唤醒一个线程。不过这里前提是signal的参数中的cond要和wait的参数中的cond指向是一样的。不匹配就无法唤醒wait对应cond的队列中的线程。broadcast,广播的意思,当调用这个函数时,会将cond对应等待队列中的所有线程都唤醒,此时所有的线程会按照顺序出队。
下面我写一个全新的例子来演示一下这里的cond用法:
// 线程个数
const static int THREAD_NUM = 4;// 不同线程的执行方法
/**********************************************************************************/
void Thread_1_Func(const string& name)
{cout << name << "is doing" << " 加密工作" << endl;
}void Thread_2_Func(const string& name)
{cout << name << "is doing" << " 持久化工作" << endl;
}void Thread_3_Func(const string& name)
{cout << name << "is doing" << " 查询工作" << endl;
}void Thread_4_Func(const string& name)
{cout << name << "is doing" << " 管理工作" << endl;
}
/**********************************************************************************/// 函数指针,指向线程所要执行的函数
typedef void(*pfunc)(const string& name);// 每个线程最有用的数据
class ThreadData
{
public:ThreadData(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond, pfunc pf):_name(name),_pmtx(pmtx),_pcond(pcond),_pf(pf){}public:string _name; // 线程名pthread_mutex_t* _pmtx; // 锁pthread_cond_t* _pcond; // 条件变量pfunc _pf; // 回调函数
};// 判断线程是否退出
static bool quit = false;// pthread_create的回调方法
void* ThreadRoutine(void* arg)
{ThreadData* ptd = (ThreadData*)arg;while(!quit){// 访问临界资源前上锁pthread_mutex_lock(ptd->_pmtx);// 相当于if判断,此时线程直接阻塞pthread_cond_wait(ptd->_pcond, ptd->_pmtx);if(!quit){// 去调用线程对应的方法ptd->_pf(ptd->_name);}else{// 退出cout << ptd->_name << " quit" << endl; }// 访问完后解锁pthread_mutex_unlock(ptd->_pmtx);}// 记得释放传来的对象,不然内存泄漏了delete ptd;ptd = nullptr;return nullptr;
}int main()
{// 局部条件变量pthread_cond_t cond;pthread_cond_init(&cond, nullptr);// 局部锁pthread_mutex_t mtx;pthread_mutex_init(&mtx, nullptr);// 各个函数执行的方法pfunc funcs[THREAD_NUM] = { Thread_1_Func, Thread_2_Func, Thread_3_Func, Thread_4_Func};// 多个线程pthread_t tids[THREAD_NUM];// 创建THREAD_NUM个线程for(int i = 0; i < THREAD_NUM; ++i){ThreadData* tmp = new ThreadData(to_string(i + 1) + "号线程", &mtx, &cond, funcs[i]);pthread_create(tids + i, nullptr, ThreadRoutine, (void*)tmp);}cout << "prepare to do the jobs" << endl;sleep(1);cout << "start doing jobs" << endl;// 发signal,让等待队列中的线程执行其方法int count = 0;while(count != 8){pthread_cond_signal(&cond);++count;sleep(1);}quit = true;// quit改为true时,其他线程已经在等待队列中了,//得让各个线程都执行其一次方法才会循环上去判断quit改变了pthread_cond_broadcast(&cond);cout << "jobs done" << endl;for(int i = 0; i < THREAD_NUM; ++i){pthread_join(tids[i], nullptr);}return 0;
}
上面的代码中,各线程执行pthread_cond_wait就会进入cond对应的等待队列中,当main线程执行一次pthread_cond_signal就会唤醒一个进程。
运行起来:
这里用的是signal来唤醒各个线程的,还可以用broadcast:
这样一次就会唤醒一批线程。
运行:
生产消费者模型
带着两个问题来讲一讲生产消费者模型:
- 条件满足的时候我们再唤醒指定的线程,但是我们怎么知道条件是否满足?
- 互斥所在线程同步中的意义,以及为何将pthread_cond_wait_wait放在加锁和解锁之间?
学习生产消费者模型可以帮助我们解决第一个问题。
在编写生产消费者模型的某种场景的代码时,可以帮助我们理解互斥锁的意义。
下面来举一个生活中的生产消费者模型的例子。
现在有一个超市,屏幕前的你是一名消费者。
超市中的商品,并不是由超市直接提供的,而是由供应商提供的,超市本质上是一个商品缓冲区。如图:
上图中,有【两种角色:消费者和生产者。一个交易场所。】
我们买东西时不会去供应商处买,而是去超市,这样能够提高效率。假如说消费者想要买一包方便面(下面都以方便面来说),如果直接去供应商那里买的话,供应商还要开机器给你做一包,这样对于供应商来说成本太高了,一包就得开一次机器,电费都比那一包方便面贵,没这个必要。直接一次性生产一大堆,然后提供给超市,这样,想要买方便面的消费者就去超市。这样就不会浪费那么多的人力和物力去一包一包的生产方便面。
有了超市,供货商只负责生产,不用为消费者准备东西。消费者只需要去超市,不用再跑到供货商处,这样逻辑上就实现了解耦。
三种关系
来说说各角色间的关系。
- 生产者和生产者
假设图中的供应商都生产的是同种资源,比如说都生产的是方便面,不过品牌不一样,这家是康师傅的,这家是今麦郎的……,那么各个生产者之间就是竞争关系,用计算机术语来说的话,生产者和生产者之间是互斥关系。
- 消费者和消费者
如果说疫情前快要封城了,所有的居民都要去超市抢购物资,此时超市的方便面已经快被抢空了,极端点,就剩一包了,那么有很多的居民都想要这一包方便面,居民也就是消费者,此时的消费者和消费者之间就是竞争关系,都去竞争那一包方便面,还是用计算机术语来说。在资源很少而需要同种资源的消费者很多的情况下,消费者之间是存在互斥关系的。
上面这两种互斥还可以这样来解释:
超市的空间是一个共享资源,比如说某一货架,不能让供应商全部都抢着去摆放资源,这样物品会放乱,同种不同类的方便面都摆放在一起,这样会造成混乱。也不能让顾客去某一空间抢着争夺资源,这样可能会导致消费者本来想拿康师傅但拿成了今麦郎。所以必须要保证生产者生产的过程是安全的,消费者消费的过程也要是安全的。
- 故生产者之间是互斥关系,消费者之间也是互斥关系。
- 生产者和消费者
生产者生产时不能让消费者消费,不然数据未传输完毕时,部分数据已经被消费者拿走,且消费者不会再次消费,这样就会导致生产者和消费者数据不一致问题。比如说一包方便面只造了不到一半就被消费者拿走,此时生产者仍然在生产,而且从开始到结束不会停止,知道生产完整一包方便面才停止,所以生产者所知道的信息是其会生产一整包方便面,但是消费者把一半拿走了,消费者得到的信息是其只拿到了半包方便面,此时就可以说二者的数据不一致。所以说生产者和消费者之间存在互斥关系。
再比如说过年期间,超市生意非常好,所有的居民都忙着进年货,当超市中的某一种商品被抢空时就要通知对应的生产者去生产,如果未通知就会出现本篇刚开始将的抢票问题,会不断有消费者询问是否有商品被补充。同样,当超市商品已摆放满了,也要通知生产者停止生产。故当缺资源的时候通知生产者生产,当资源补充上来时就通知消费者消费;当资源盈余时就通知生产者停止生产,同时刺激消费者消费。即同步。所以生产者和消费者之间也存在同步关系。
所以说生产者和消费者之间存在同步和互斥两种关系。
那么我们写代码的时候怎么编写一个生产消费者模型呢?
只需要掌握住三个原则即可。
- 一个交易场所。
- 两个角色——生产者和消费者。
- 三种关系,生生(互斥)、消消(互斥)、生消(同步和互斥)。
不过第三点有个特例,当只有一个生产者 和 一个消费者时就只需要维护生消这一种关系,这一点后面会再谈,这里先暂不考虑。
通过锁和条件变量来体现出三种关系。
用基本工程师思维再次理解
代码中我们要用线程来体现出生产者和消费者,也就是要给线程进行角色划分。
超市是用某种数据结构来表示的缓冲区。
商品即某些数据。
超市里是否有新增货物,生产者最清楚。因为生产者一成功生产就会新增货物。
超市里剩余多少空间让生产者生产,消费者最清楚。因为消费者每次消费都会新增空余空间。
所以这里就可以解决最初说的第一个问题:条件满足的时候我们再唤醒指定的线程,但是我们怎么知道条件是否满足?
当生产者生产商品后就可立马通知消费者来消费,消费者将商品拿走后就可通知生产者去生产。故可以让生产者线程和消费者线程互相同步完成对应的生产消费者模型。这句话中的通知就是唤醒。加粗的字样就是对这个问题的解答。
再来看一个图:
这里要强调一点,生产和消费的过程不仅仅是从生产者生产到仓库再让消费者拿走。重要的点不在这,而是生产者从获取到数据开始生产和消费者拿掉数据开始处理的两个过程,中间传递数据的过程耗时是非常短的,至于为什么等会写代码的时候就知道了。
我前面讲进程间通信的时候说过:进程间通信的本质是让两个进程看到同一块空间。
校正一下:进程间通信的前提是让两个进程看到同一块空间,进程间通信的本质就是生产消费者模型。比如说管道,自带同步和互斥的属性,正常情况下一端写一端读,当管道为空的时候读端会阻塞,当管道满的时候写端会阻塞。这里就和生产消费者模型很像,写端读入数据后就让读端去读,读端读好了数据后就让写端去写。当写端关闭时读端就相当于读到了文件末尾,read会返回零;当读端关闭的时候写端直接出错,进程就退出了。
【注】如果我这里讲的内容你不太会的话,可以看看我这篇博客:【Linux】进程间通信(匿名管道、命名管道、共享内存等,包含代码示例)
基于生产消费者模型的阻塞队列
下面就来写写生产消费者模型的代码。
我会写两个版本,第一个版本细节比较少,第二个版本会基于第一个版本稍微进行一点优化。
版本一
先来说说大致思路:
-
一个交易场所,前面超市的那个例子说了,超市就是用某个数据结构表示的缓冲区,这里我就以队列来表示这个缓冲区,不过我不会再自己手搓一个队列,直接用STL库的那个,如果有同学不懂STL的队列,可以看看这篇:【C++】STL栈和队列基本功能介绍、题目练习和模拟实现(容器适配器)。
-
两种角色,生产者和消费者的表示,这里我就先来简单一点的,一个生产者的线程和一个消费者的线程。后面的那个例子再来多个生产者和消费者。
-
3种关系中,生生、消消、生消都要保持两两互斥,生消还要有一个同步的关系。我们可以用一个类来实现,其中可以用一个锁来表示所有的互斥,用两个条件变量来表示生消的同步,这个类还可以将第一点中的队列包含在在内。
先把这个类简单给出来:
template<class T>
class CPqueue
{
public:// 构造CPqueue(int capacity): _capacity(capacity){// 互斥锁、条件变量都要用接口来初始化pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}private:// 用队列这个数据结构来表示std::queue<T> _q;// 超市中能够存放的最大容量int _capacity;// 判断超市是否已经放满了pthread_cond_t _full;// 判断超市是否是空的pthread_cond_t _empty;// 互斥锁,用来两两互斥pthread_mutex_t _mtx;
};
上面的两个条件变量,一个是判断当前队列中存放的数据满了没有,一个是判断当前队列中是否空了。
如果满了,就得让生产者线程阻塞,不能继续生产了,并通知消费者来消费,等接到生产信号了再去生产;
如果空了,就得让消费者线程阻塞,不能继续消费了,并通知生产者去生产,等接到消费信号了再去消费。
而这里对应的生产的动作就是往队列中push数据,消费的动作就是从队列中pop数据。
所以要提供两个接口,一个是让生产者push的,一个是让消费者pop的。
下面的代码是在CPqueue中的:
// 判断队列是否为空
bool IsEmpty()
{return _q.size() == 0;
}// 判断队列是否已满
bool IsFull()
{return _q.size() == _capacity;
}void PushData(const T& data)
{// 进来先上锁pthread_mutex_lock(&_mtx);/* 上了锁之后先判断临界资源是否准备就绪,也就是队列是否满了*/if(IsFull()) pthread_cond_wait(&_full, &_mtx);// 到此处就说明队列不满,就可以push数据了_q.push(data);// push完了先发送让消费者消费的信号pthread_cond_signal(&_empty);// 解锁pthread_mutex_unlock(&_mtx);
}void PopData(T& data)
{// 先上锁pthread_mutex_lock(&_mtx);/* 上锁后,先判断临界资源是否准备就绪,也就是队列是否为空*/if(IsEmpty()) pthread_cond_wait(&_empty, &_mtx);// 到此处说明队列不为空,就可以pop了data = _q.front(); // 先拿数据再pop_q.pop();// pop完了发送让生产者生产的信号pthread_cond_signal(&_full);// 解锁pthread_mutex_unlock(&_mtx);
}
这里就是同步和互斥的逻辑。包含了一个交易场所和三种关系中的生消同步和互斥。
下面来写生产者和消费者的线程:
// 生产者执行的方法
void* Productor(void* arg)
{CPqueue<int>* pq = (CPqueue<int>*)arg;int data = 10;while(1){std::cout << "productor send data ::" << data << std::endl;pq->PushData(data);data++;sleep(1);}
}// 消费者执行的方法
void* Consumer(void* arg)
{CPqueue<int>* pq = (CPqueue<int>*)arg;while(1){int data = 0;pq->PopData(data);std::cout << "consumer get data ::" << data << std::endl << std::endl;}
}int main()
{CPqueue<int>* pq = new CPqueue<int>(6);// 消费者线程pthread_t consumerThread;// 生产者线程pthread_t productorThread;// 消费者线程初始化pthread_create(&consumerThread, nullptr, Consumer, (void*)pq);// 生产者线程初始化pthread_create(&productorThread, nullptr, Productor, (void*)pq);// 等待从线程退出pthread_join(consumerThread, nullptr);pthread_join(productorThread, nullptr);delete pq;return 0;
}
运行起来效果就是这样的:
还可以控制消费者的消费时间点,比如说队列满了再让消费者消费:
运行:
同样的还可以队列装一半了再退出,这里就不演示了。
这里可以说一下pthread_cond_wait的作用了:
-
pthread_cond_wait是在临界区中的,来一个问题:说如果线程等待了,会持有锁等待吗?
答案是不会的,pthread_cond_wait的第二个参数是一把锁的指针,意义在于,当pthread_cond_wait调用成功后,传入pthread_cond_wait的锁会被自动解开,所以不用担心线程在pthread_cond_wait的时候会带锁wait。
线程阻塞并恢复之后,从哪里阻塞就会从哪里唤醒,也就是pthread_cond_wait这里,当线程被唤醒时,pthread_cond_wait会自动帮助线程申请其本来调用pthread_cond_wait成功时解开的那把锁,因为县城被唤醒时仍然在临界区中。
不过这里唤醒有多种情况,就先说一个生产者和一个消费者的。
比如说我这里代码写的是先唤醒对方线程,再解锁:
当另一方醒来时wait会自动申请锁,但此时锁是被当前方的线程占用的,所以另一方又会在锁上面进行阻塞等待(申请锁时,若锁被占用,则当前线程就阻塞等待锁资源),等到当前方解锁后,另一方就会自动拿到锁。
先解锁后唤醒时,比如这样:
此时就是另一方被唤醒时锁未被占用,那么就会直接得到锁。
这里是单生产者和单消费者的,还有多生产者和多消费者的情况,同理,不过是在锁上等待的线程会更多,这里就不再多讲了。
需要注意的是,唤醒和解锁的先后顺序是都可以的,只要发生了生成——消费这一行为即可。
不过我个人更推荐先唤醒后解锁。
前面说了,生产消费者模型能够提高工作效率。如果单从生产 -> 消费的角度来看问题的话,其实这里并没有什么效率上的提高。因为这一步只是进行了简单的拷贝而已,真正的效率提高在生产者和消费者可同时工作这一点上。生产者生产完后,消费者去取数据并执行后续操作,在这个过程中,生产者还可以继续接受生产的任务,比如网络发来的数据 / 标准输入的数据,接收好就送到缓冲区(超市)中,重在二者可以并发执行。就像餐馆里面的大厨和服务员一样,大厨做好饭后不需要亲自将饭送到餐桌上,而是让端盘的服务员去送,服务员送的同时,大厨仍可继续做饭,服务员送完菜后也可
处理顾客的其他要求(拿酒、盛米饭等)。这样服务员和大厨间的工作是互不影响的,效率就高了。
真正的时间消耗在生产者获取到数据过程 和 消费者获取到数据后的后续处理过程,中间的拷贝耗时相对来说是非常短的,当消费者的后续处理动作很耗时时,可以搞多个消费者线程并发执行该动作(总线程个数尽量不要超过CPU核数,相等最好),这样效率会更高。
版本二
- pthread_cond_wait是一个函数,只要是一个函数就可能会调用失败,拿push来说:
if(IsFull()) pthread_cond_wait();
pthread_cond_wait是有返回值的,是一个int,调用失败了返回的是一个错误码。
如果这里pthread_cond_wait调用失败了,那就可能导致队列是满的但是代码仍向下执行了,就会超出规定的队列容量(STL的queue中会自动扩容,我们写的那个capacity只是我们自己规定的,如果出现不合法的行为也是不好检测的),那么这样就不合理了。pthread_cond_wait还可能存在伪唤醒的情况,意思就是条件变量_full/_empty并不满足条件但是当前线程被唤醒了。比如若其他线程误操作发送了信号,就会导致当前线程跳出其所阻塞的队列中,并进行后续操作,也是不合理的。
所以要把if改一改,换成while:
这样就算调用失败了,或者伪唤醒了,都会再上去判断一次IsFull(),此时如果队列还是满的,就会再次调用pthread_cond_wait,若还失败了,又会上去再判断……知道真正被唤醒并且队列不是满的了就会跳出while循环,这里push是这样,pop也是同理的:
故当跳出while后,后面的代码是一定能正确的进行生产和消费的。
还有这里的阻塞队列不用关心哪一个线程先执行。
如果是生产者先执行的话,就直接生产即可。
如果是消费者先执行的话,队列为空,会阻塞住,此时就会还会让生产者来生产。
所以逻辑上,一定是能让生产者先往队列中生产者东西的。
注意我上面代码中用了模版,而且上面演示的类型是int,这里我要改一改了,改成一个自定义类型。简单实现一个计算器,其成员变量为两个值,一个function包装器(如果有屏幕前的你对包装器不了解的话,可以看这篇:【C++】C++11中比较重要的内容介绍):
// 对function包装器重命名一下
typedef std::function<int(int, int)> func;class Caculator
{
public:Caculator(){}Caculator(int x, int y, func fun): _x(x), _y(y), _fun(fun){}int operator()(){return _fun(_x, _y);}public:int _x;int _y;func _fun;
};
阻塞队列的代码不变。线程执行方法变一下:
// 存放各个函数的做法和接口
std::vector<std::pair<char, func>> kv;// 这里搞了4个计算功能,加减乘除
const int KVSIZE = 4;// 生产者执行方法
void* Productor(void* arg)
{CPqueue<Caculator>* pq = (CPqueue<Caculator>*)arg;while(1){// 随机分配给消费者任务int task = rand() % KVSIZE;func fun = kv[task].second;int x = rand() % 200;int y = rand() % 500;std::cout << "productor send task ::" << task << "-->" << x << (kv[task].first) << y << '=' << '?' << std::endl;pq->PushData(Caculator(x, y, fun));sleep(1);}
}// 消费者执行方法
void* Consumer(void* arg)
{CPqueue<Caculator>* pq = (CPqueue<Caculator>*)arg;while(1){Caculator tmp;pq->PopData(tmp);std::cout << "consumer get task ::" << tmp() << std::endl << std::endl;}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x323424);func MyAdd = [](int x, int y){ return x + y; };func MySub = [](int x, int y){ return x - y; };func MyMul = [](int x, int y){ return x * y; };func MyDiv = [](int x, int y){ return x / y; };// 这里用到了lambda表达式,如果屏幕前的你不懂,可以看看我刚刚给的那个链接kv.push_back(std::pair<char, func>('+', MyAdd));kv.push_back(std::pair<char, func>('-', MySub));kv.push_back(std::pair<char, func>('*', MyMul));kv.push_back(std::pair<char, func>('/', MyDiv));CPqueue<Caculator>* pq = new CPqueue<Caculator>(4);// 消费者线程pthread_t consumerThread;// 生产者线程pthread_t productorThread;// 消费者线程初始化pthread_create(&consumerThread, nullptr, Consumer, (void*)pq);// 生产者线程初始化pthread_create(&productorThread, nullptr, Productor, (void*)pq);pthread_join(consumerThread, nullptr);pthread_join(productorThread, nullptr);delete pq;return 0;
}
再加上阻塞队列,运行起来就是这样:
这里除法有一丁点问题,我就不改了,各位想改的自己动手试试,改正浮点数就行。
多生多消
这里搞简单点,就两个生产者、两个消费者:
其它代码就只用改消费者打印换行一次,所有从线程打印一下线程id就行。
运行:
可以看到,有两个消费者和两个生产者,成功。
这里就和线程池有点像了,不过我不打算在这篇讲线程池,下一篇再详谈。
利用RAII来对锁进行优化
RAII,学过智能指针的同学应该知道是啥,如果你不懂,看这篇:【C++】智能指针。
这里写一个类,专门用来管理锁资源:
class LockGuard
{
public:LockGuard(pthread_mutex_t* pmtx):_pmtx(pmtx){pthread_mutex_lock(_pmtx);}~LockGuard(){pthread_mutex_unlock(_pmtx);}public:pthread_mutex_t* _pmtx;
};
再加锁的时候就不需要调用pthread库中的函数了,直接定义一个局部的对象就行,定义时自定调用构造,就会进行加锁,析构就会调用解锁。
void PushData(const T& data)
{// 直接让对象来管理锁LockGuard LG(&_mtx); // 构造加锁/* 上了锁之后先判断临界资源是否准备就绪,也就是队列是否满了*/while(IsFull()) pthread_cond_wait(&_full, &_mtx);// 到此处就说明队列不满,就可以push数据了_q.push(data);// 发送消费者消费的信号pthread_cond_signal(&_empty);} // 析构解锁void PopData(T& data)
{// 直接让对象来管理锁LockGuard LG(&_mtx);// 构造加锁/* 上锁后,先判断临界资源是否准备就绪,也就是队列是否为空*/while(IsEmpty()) pthread_cond_wait(&_empty, &_mtx);// 到此处说明队列不为空,就可以pop了data = _q.front(); // 先拿数据再pop_q.pop();// pop完了发送让生产者生产的信号pthread_cond_signal(&_full);} // 析构解锁
运行就是:
和前面没啥区别。不过RAII的思想放到这里非常的妙。
这里也就是RALL风格的加锁。
这篇就讲到这,下一篇细说信号量等知识。
到此结束。。。
相关文章:

【Linux】详解线程第三篇——线程同步和生产消费者模型
线程同步和生消模型 前言正式开始再次用黄牛抢票来讲解线程同步的思想通过条件变量来实现线程同步条件变量接口介绍初始化和销毁pthread_cond_waitsignal和broadcast 生产消费者模型三种关系用基本工程师思维再次理解基于生产消费者模型的阻塞队列版本一版本二多生多消 利用RAI…...
k8s 安装
文章目录 k8s 客户端安装k8s集群minikubekindkubeadm 验证 k8s 客户端 用于连接k8s集群,建议下载1.23.x的版本,其他的版本本地运行可能会有莫名其妙的报错 https://dl.k8s.io/release/v1.23.16/bin/linux/amd64/kubectl 安装k8s集群 minikube Minik…...

红队打靶:THE PLANETS: MERCURY打靶思路详解(vulnhub)
目录 写在开头 第一步:主机发现和端口扫描 第二步:Web渗透 第三步:获取初步立足点并搜集信息 第四步:软连接劫持sudo提权 总结与思考 写在开头 本篇博客在自己的理解之上根据大佬红队笔记的视频进行打靶,详述了…...

【网络协议】IP
当连接多个异构的局域网形成强烈需求时,用户不满足于仅在一个局域网内进行通信,他们希望通过更高一层协议最终实现异构网络之间的连接。既然需要通过更高一层的协议将多个局域网进行互联,那么这个协议就必须为不同的局域网环境定义统一的寻址…...
Python 布尔类型
布尔值表示两个值之一:True(真)或False(假)。 布尔值 在编程中,您经常需要知道一个表达式是否为True或False。 您可以在Python中评估任何表达式,并获得两个答案之一:True或False。…...

iOS设备管理器iMazing比iTunes好用吗?有哪些优势
虽然 iTunes 是 Apple 官方指定的 iPhone 数据备份和管理工具,但是一直以来 iTunes 卡顿的使用体验和过慢的备份过程为不少人诟病。如果大家也被 iTunes 体验不佳的备份和管理功能所困扰,那么简单易用、功能强大的iMazing 能为你解决这个问题。 iMazing…...
Opengl之深度测试
在坐标系统小节中,我们渲染了一个3D箱子,并且运用了深度缓冲(Depth Buffer)来防止被阻挡的面渲染到其它面的前面。在这一节中,我们将会更加深入地讨论这些储存在深度缓冲(或z缓冲(z-buffer))中的深度值(Depth Value),以及它们是如何确定一个片段是处于其它片段后方的。 …...
利用ICG-NH2/Amine进行DNA标记1686147-55-6星戈瑞
ICG-NH2(吲哚菁绿胺)可以用于DNA标记,这种标记方法通常涉及到DNA上的胺基反应基团和ICG-NH2之间的化学反应。以下是一种常见的方法,用于利用ICG-NH2标记DNA分子: 步骤: 1.准备目标DNA:你需要准…...
Pyecharts数据可视化
Pyecharts数据可视化 1、Pyecharts模块2、柱状图3、折线图4、饼图5、散点图6、图表合并7、词云8、地图 1、Pyecharts模块 ECharts是百度提供的基于JavaScript的开源可视化库,主要用于Web端数据可视化 Echarts是通过JS实现的,Pyecharts则可以使用Python来…...

集合-List集合
系列文章目录 1.集合-Collection-CSDN博客 2.集合-List集合-CSDN博客 文章目录 目录 系列文章目录 文章目录 前言 一 . 什么是List? 二 . List集合的特点 三 . 常用方法 1.void add(int index, E element): 将指定的元素插入到列表的指定位置。 2.E remove(int in…...
vuex的使用
1 vuex的使用 1 vuex的使用 store/index.js -在Vue中实现集中式状态(数据)管理的一个Vue插件,对vue应用中多个组件的共享状态进行集中式 的管理(读/写),也是一种组件间通信的方式,且适用于任意…...

raw图片处理软件:DxO PhotoLab 6 mac中文版支持相机格式
DxO PhotoLab 6 mac是一款专业的RAW图片处理软件,适用于Mac操作系统。它具有先进的图像处理技术和直观易用的界面,可帮助用户轻松地将RAW格式的照片转换为高质量的JPEG或TIFF图像。 DxO PhotoLab 6支持多种相机品牌的RAW格式,包括佳能、尼康、…...
ReactPortals传送门
ReactPortals传送门 React Portals提供了一种将子节点渲染到父组件以外的DOM节点的解决方案,即允许将JSX作为children渲染至DOM的不同部分,最常见用例是子组件需要从视觉上脱离父容器,例如对话框、浮动工具栏、提示信息等。 描述 <div&…...

【GDB】 command 命令
GDB command 命令 语法 command 命令是一个很好用的调试命令,它配合断点使用,可以在指定的断点执行预先设置的命令 其语法为:command bread_id,这样会提示你输入你要执行的命令,以 end 结束。这个 bread_id 就是用 …...

1038 统计同成绩学生
输入样例: 10 60 75 90 55 75 99 82 90 75 50 3 75 90 88 输出样例: 3 2 0 solution #include <stdio.h> int main(){int n, d, k, hash[101] {0}, a[100000];scanf("%d", &n);for(int i 0; i < n; i){scanf("%d&quo…...

git报错:Failed to connect to 127.0.0.1 port 1080
Bug描述 由于在试了网上的这条命令 git config --global http.proxy socks5 127.0.0.1:1080 git config --global https.proxy socks5 127.0.0.1:1080git config --global http.proxy 127.0.0.1:1080 git config --global https.proxy 127.0.0.1:1080Bug描述:Faile…...

php eayswoole node axios crypto-js 实现大文件分片上传复盘
不啰嗦 直接上步骤 步骤1.开发环境配置 项目需要node.js 做前端支撑 官网下载地址: http://nodejs.cn/download/ 根据自己需要下载对应的版本,我下载的是windows系统64位的版本。 包下载好后 进行安装,安装步骤在此省略... 测试是否安装成功 …...

《Upload-Labs》01. Pass 1~13
Upload-Labs 索引前言Pass-01题解 Pass-02题解总结 Pass-03题解总结 Pass-04题解 Pass-05题解总结 Pass-06题解总结 Pass-07题解总结 Pass-08题解总结 Pass-09题解 Pass-10题解 Pass-11题解 Pass-12题解总结 Pass-13题解 靶场部署在 VMware - Win7。 靶场地址:https…...
v-for中的key
在Vue中,当使用v-for指令循环渲染元素时,添加:key是一个推荐做法,尤其是在循环的元素可能会被重新排序、添加或删除的情况下。 :key的作用是为每个循环的元素提供一个唯一的标识符,以便Vue能够跟踪和管理这些元素的状态。Vue使用…...

MySQL学习笔记17
MySQL权限管理grant: 权限说明: Table 6.2 Permissible Privileges for GRANT and REVOKE PrivilegeGrant Table ColumnContextALL [PRIVILEGES]Synonym for “all privileges”Server administrationALTERAlter_privTablesALTER ROUTINEAlter_routin…...

XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...

大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
【Linux】shell脚本忽略错误继续执行
在 shell 脚本中,可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行,可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令,并忽略错误 rm somefile…...

智慧医疗能源事业线深度画像分析(上)
引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...
<6>-MySQL表的增删查改
目录 一,create(创建表) 二,retrieve(查询表) 1,select列 2,where条件 三,update(更新表) 四,delete(删除表…...

css实现圆环展示百分比,根据值动态展示所占比例
代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...

.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...