当前位置: 首页 > news >正文

【Linux】基于阻塞队列的生产者消费者模型

​🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
在这里插入图片描述

目录

    • 👉为何要使用生产者消费者模型👈
    • 👉生产者消费者模型的优点👈
    • 👉基于阻塞队列的生产者和消费者模型👈
    • 👉总结👈

👉为何要使用生产者消费者模型👈

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型在生活中是相当常见的,比如客户去超市里买商品就是一个很好的例子。在这个例子中,供货商就是生产者,顾客就是消费者,超市就是一个交易场所,本质是一个商品的缓冲区。超市存在的意义就是让生产者和消费者解耦,以提高效率。


在生产者消费者模型中,有一个交易场所超市,生产者和消费者两种角色,生产者和生产者、消费者和消费者、生产者和消费者三种关系。其中生产者和生产者之间的关系是竞争互斥关系,消费者和消费者之间的关系是竞争互斥关系,生产者和消费者之间的关系是互斥和同步关系。当生产者进行生产时,消费者不能进行消费(保证安全),所以生产者和消费者是互斥关系。当商品很少时,生产者需要进行生产,消费者需要进行等待;而当商品很多时,生产者不能进行生产,消费者进行消费,所以生产行为和消费行为具有一定的顺序性,也就是说生产者和消费者是同步关系。

在计算机世界里,生产者和消费者都是通过线程模拟出来的。当生产者线程生产完商品后,就可以通知消费者线程来进行消费;而当消费者线程消费完商品后,就可以通知生产者线程来进行生产,这个过程就是通过条件变量来实现的。

👉生产者消费者模型的优点👈

想要真正理解生产者消费者模型优点,我们需要知道生产者生产数据和消费者消费数据都是想要消耗时间的。那么当消费者线程进行消费数据时,该线程是不会访问阻塞队列的。所以当消费者线程进行消费时,生产者线程从其他地方获取数据并将该数据放入到阻塞队列中,这样就可以提高生产者线程和消费者线程的并发度了。所以生产者消费者模型具有一下优点:解耦、支持并发。

👉基于阻塞队列的生产者和消费者模型👈

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。注:管道本身就是一个阻塞队列,有数据就消费,没数据就等待。管道内部已经实现了互斥和同步的功能。

在这里插入图片描述

单生产者单消费者模型

// BlockQueue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>const int DefaultCap = 5;template <class T>
class BlockQueue
{
private:bool isEmpty(){return _bq.empty();}bool isFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = DefaultCap): _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_empty, nullptr);pthread_cond_init(&_full, nullptr);}// 生产者void push(const T& in){// 访问临界资源需要进行加锁保护pthread_mutex_lock(&_mtx);// 1.先检查当前的临界资源是否满足访问条件// 检查临界资源是否满足访问条件,也是访问临界资源// 所以它也需要在加锁和解锁之间// pthread_cond_wait函数是让线程在特定的条件变量下阻塞等待// 当进行等待时,线程所持有的锁会被自动释放掉.当条件变量满足// 时,线程会被在阻塞挂起的地方被唤醒.线程被唤醒的时候,是在临// 界区中的,pthread_cond_wait会自动帮助线程获取锁while(isFull())pthread_cond_wait(&_full, &_mtx);// pthread_cond_wait是一个函数,它就可能会调用失败,从而出现// 伪唤醒(临界资源不满足访问条件,却往下进行临界资源的访问)的// 情况,所以需要通过while来保证满足访问临界资源的条件,而不能// 通过if来判断临界资源是否能被访问// 2.访问临界资源(来到这里,临界资源100%是就绪的!)_bq.push(in);// 可以指定特定的唤醒线程的策略,如数据的个数大于容量的一半// if(2 * _bq.size() >= _capacity) pthread_cond_signal(&_empty);// 唤醒线程可以在释放锁之前,也可以在释放锁之后pthread_cond_signal(&_empty);pthread_mutex_unlock(&_mtx);}// 消费者void pop(T* out){pthread_mutex_lock(&_mtx);while(isEmpty())pthread_cond_wait(&_empty, &_mtx);*out = _bq.front();_bq.pop();pthread_cond_signal(&_full);pthread_mutex_unlock(&_mtx);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_empty);pthread_cond_destroy(&_full);}private:std::queue<T> _bq;     // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)int _capacity;         // 容量上线pthread_mutex_t _mtx;  // 互斥锁保护队列的安全pthread_cond_t _empty; // 该条件变量表示bq是否为空pthread_cond_t _full;  // 该条件变量表示bq是否为满
};// ConProd.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>void* consume(void* args)
{BlockQueue<int>* bq = (BlockQueue<int>*)args;while(1){int a;bq->pop(&a);std::cout << "消费了一个数据:" << a << std::endl;sleep(1);}return nullptr;
}void* produce(void* args)
{BlockQueue<int>* bq = (BlockQueue<int>*)args;int a = 1;while(1){bq->push(a);std::cout << "生产了一个数据:" << a << std::endl;++a;}return nullptr;
}int main()
{BlockQueue<int>* bq = new BlockQueue<int>();pthread_t c, p;pthread_create(&c, nullptr, consume, (void*)bq);pthread_create(&p, nullptr, produce, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

在这里插入图片描述

增加任务类

// Task.hpp
#pragma once#include <iostream>
#include <functional>typedef std::function<int(int, int)> func_t;class Task
{
public:Task() = default;Task(int x, int y, func_t func): _x(x), _y(y), _func(func){}int operator()(){return _func(_x, _y);}public:int _x;int _y;func_t _func;
};// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>int Add(int x, int y)
{return x + y;
}void* consume(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 获取任务Task t;bq->pop(&t);// 完成任务std::cout << "consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;}return nullptr;
}void* produce(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 制作任务 -- 不一定从生产者来的,可能是从网络来的int x = rand() % 10 + 1;int y = rand() % 20 + 1;// 生产任务Task t(x, y, Add);bq->push(t);std::cout << "productor: " << x << " + " << y << " = ?" << std::endl; sleep(1);}return nullptr;
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);BlockQueue<Task>* bq = new BlockQueue<Task>();pthread_t c, p;pthread_create(&c, nullptr, consume, (void*)bq);pthread_create(&p, nullptr, produce, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

在这里插入图片描述

多生产者多消费者

// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>int Add(int x, int y)
{return x + y;
}void* consume(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 获取任务Task t;bq->pop(&t);// 完成任务std::cout << pthread_self() << " consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;}return nullptr;
}void* produce(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 制作任务 -- 不一定从生产者来的,可能是从网络来的int x = rand() % 10 + 1;int y = rand() % 20 + 1;// int x, y;// std::cout << "Please Enter x: ";// std::cin >> x;// std::cout << "Please Enter y: ";// std::cin >> y;// 生产任务Task t(x, y, Add);bq->push(t);std::cout << pthread_self() << " productor: " << x << " + " << y << " = ?" << std::endl; sleep(1);}return nullptr;
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);BlockQueue<Task>* bq = new BlockQueue<Task>();// pthread_t c, p;pthread_t c[2], p[2];pthread_create(c, nullptr, consume, (void*)bq);pthread_create(c + 1, nullptr, consume, (void*)bq);pthread_create(p, nullptr, produce, (void*)bq);pthread_create(p + 1, nullptr, produce, (void*)bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bq;return 0;
}

多生产者多消费者模型的意义就是让生产者并发地获取和制作任务,让消费者并发地完成消费任务。多生产者多消费者模型主要用于处理消费任务或者获取和制作任务比较耗时的场景。

锁的封装

// LockGuard.hpp
#pragma once#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t* pmtx): _pmtx(pmtx){}void lock(){std::cout << "进行加锁" << std::endl;pthread_mutex_lock(_pmtx);}void unlock(){std::cout << "进行解锁" << std::endl;pthread_mutex_unlock(_pmtx);}~Mutex(){}private:pthread_mutex_t* _pmtx;
};// RAII的加锁方式
class LockGuard
{
public:LockGuard(pthread_mutex_t* pmtx): _mtx(pmtx){_mtx.lock();}~LockGuard(){_mtx.unlock();}private:Mutex _mtx;
};// BlockQueue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"const int DefaultCap = 5;template <class T>
class BlockQueue
{
private:bool isEmpty(){return _bq.empty();}bool isFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = DefaultCap): _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_empty, nullptr);pthread_cond_init(&_full, nullptr);}// 生产者void push(const T &in){// 出了函数的作用域会自动解锁LockGuard lockGuard(&_mtx);while (isFull())pthread_cond_wait(&_full, &_mtx);_bq.push(in);pthread_cond_signal(&_empty);}// 消费者void pop(T *out){// 出了函数的作用域会自动解锁LockGuard lockGuard(&_mtx);while (isEmpty())pthread_cond_wait(&_empty, &_mtx);*out = _bq.front();_bq.pop();pthread_cond_signal(&_full);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_empty);pthread_cond_destroy(&_full);}private:std::queue<T> _bq;     // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)int _capacity;         // 容量上线pthread_mutex_t _mtx;  // 互斥锁保护队列的安全pthread_cond_t _empty; // 该条件变量表示bq是否为空pthread_cond_t _full;  // 该条件变量表示bq是否为满
};

👉总结👈

本篇博客主要讲解了为什么要使用生产者消费者模型、基于阻塞队列的生产者和消费者模型以及 RAII 的加锁方式等等。那么以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家!💖💝❣️

相关文章:

【Linux】基于阻塞队列的生产者消费者模型

​&#x1f320; 作者&#xff1a;阿亮joy. &#x1f386;专栏&#xff1a;《学会Linux》 &#x1f387; 座右铭&#xff1a;每个优秀的人都有一段沉默的时光&#xff0c;那段时光是付出了很多努力却得不到结果的日子&#xff0c;我们把它叫做扎根 目录&#x1f449;为何要使用…...

【华为OD机试 2023最新 】 真正的密码(C++)

文章目录 题目描述输入描述输出描述用例题目解析C++题目描述 在一行中输入一个字符串数组,如果其中一个字符串的所有以索引0开头的子串在数组中都有,那么这个字符串就是潜在密码, 在所有潜在密码中最长的是真正的密码,如果有多个长度相同的真正的密码,那么取字典序最大的…...

差分算法(蓝桥杯复习+例题讲解+模板c++)

文章目录差分介绍差分应用区间加区间求和总结3729. 改变数组元素100. 增减序列文章首发于&#xff1a;My Blog 欢迎大佬们前来逛逛 差分介绍 差分是一种常见的算法&#xff0c;用于快速修改数组中某一段区间的值。 差分的思想就是预处理出数组的差分数组&#xff0c;然后修改…...

CSS+ JS 实现手电筒效果

前言概述 JavaScript 结合 CSS 打造的一款图片特效&#xff0c;当鼠标拖拽滑块时&#xff0c;让本该置灰的图片局部恢复本来的颜色。且该效果随着你的鼠标的按下时的移动而移动。 核心功能 图片置灰 拖拽功能 让滑块位置处的图片恢复本来的颜色 实现原理 这个的实现原理并不…...

2021地理设计组二等奖:基于InSAR和指数分析的地面沉降风

作品简介 一、作品背景 地面沉降是指地面高程的降低, 又称地面下沉或地沉, 是以缓慢、难以察觉的向下垂直运动为主, 是指在自然和人为因素作用下, 由于地壳表层土体压缩而导致区域性地面标高降低的一种环境现象。目前, 地面沉降己成为城市化进程中普遍存在的生态环境问题, 成为…...

计算机操作系统(第四版)第二章进程的描述与控制—课后习题答案

1.什么是前趋图&#xff1f;为什么要引入前趋图&#xff1f; 前趋图是一个有向无循环图&#xff0c;记为DAG&#xff0c;用于描述进程之间执行的先后关系。 2.试画出下面四条语句的前趋图&#xff1a; S1:axy; S2:bz1; S3:ca-b; S4:wc1; 3.为什么程序并发执行会产生间断性特征&…...

CAN通信----电路图

CAN通信----基本原理 一、CAN总线网络连接 1.闭环总线网络----ISO11898 闭环总线网络高速、短距离&#xff0c;它的总线最大长度为 40m&#xff0c;通信速度最高为 1Mbps&#xff0c;总线的两端各要求有一个120 欧的电阻。 2.开环总线网络----ISO11519 开环总线网络低速、…...

Windows系统安装ElasticSearch(一)

一 ES介绍Elasticsearch 是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene(TM) 基础上的搜索引擎.当然 Elasticsearch 并不仅仅是 Lucene 那么简单&#xff0c;它不仅包括了全文搜索功能&#xff0c;还可以进行以下工作:分布式实时文件存储&#…...

linux 产生随机数 并遍历

1、产生随机数 varRANDOMvarRANDOM varRANDOMvar[ $var % 150 ] 2、产生不重复的随机数 $ entries($(shuf -i 0-149 -n 15)) $ echo “${entries[]}” 3、对随机数排序 $ entries($(shuf -i 0-149 -n 15 | sort -n)) $ echo “entries[]"12224549546678798393118119124140…...

【3.24】Mybatis常见面试题

Mybatis常见面试题 #{}和&#xffe5;{}的区别是什么&#xff1f; 【#】&#xff1a;底层执行SQL使用PreparedStatement对象&#xff0c;预编译SQL&#xff0c;相对安全。入参使用占位符的方式。 【$】&#xff1a;底层执行SQL使用Statement对象&#xff0c;入参使用SQL拼接的…...

IDEA 热部署,修改代码不用重启项目

热部署指在修改项目代码的时候不重启服务器让修改生效。安装JRebel and XRebelFile->Settings&#xff0c;然后Plugins-> Marketplace&#xff0c;输入JRebel&#xff0c;安装如下插件——JRebel and XRebel &#xff0c;重启idea激活JRebel and XRebel第一行输入网址&am…...

将 XLS 转换为 EXE:xlCompiler Crack

只需单击几下即可将Excel文件转换为应用程序 xl编译器无需编程即可将您的Excel电子表格转换为软件应用程序 将 XLS 转换为 EXE 将Excel文件转换为具有保护选项的应用程序。Excel 到 EXE 转换器为您提供了分发 Excel 模型的竞争优势和灵活性。将 Excel 的功能丰富的环境保存在应…...

【百面成神】spring基础12问,你能坚持到第几问

前 言 &#x1f349; 作者简介&#xff1a;半旧518&#xff0c;长跑型选手&#xff0c;立志坚持写10年博客&#xff0c;专注于java后端 ☕专栏简介&#xff1a;java面试宝典&#xff0c;特点&#xff1a;全、精、深、简&#xff0c;力求每个核心知识点1分钟回答好。 &#x1f3…...

javaSE类和对象(下)

目录君1.封装2.访问限定符3.包的定义及使用4.static成员变量5.static成员方法6.代码块及其分类实例代码块静态代码块静态代码块与实例代码块的执行顺序static成员变量(类变量)初始化1.封装 面向对象程序三大特性&#xff1a;封装、继承、多态。而类和对象阶段&#xff0c;主要…...

【数据结构】第四站:单链表力扣题(二)

目录 一、链表的回文结构 二、相交链表 三、环形链表 四、环形链表Ⅱ 五、复制带随机指针的链表 一、链表的回文结构 题目描述&#xff1a;链表的回文结构_牛客题霸_牛客网 对于这道题&#xff0c;如果没有前面的一些题的基础&#xff0c;是非常难做的&#xff0c;我们的思…...

KafKa知识汇总

前言 汇总相关知识 Kafka快速实战与基本原理详解...

【RV1126】调试GT911,1024x600 7寸 MIPI 电容触摸屏

文章目录一、驱动注册失败二、触摸屏可以触摸&#xff0c;但是x轴数据反了三、可以触摸了&#xff0c;但是Y轴数据跳变&#xff0c;几乎只有一半的屏幕是可以正常滑动的三、汇顶触摸屏配置文件解析四、使用新的配置文件4.1 新配置解决问题4.2 测试触摸的方法在kernel增加frame …...

C的强符号/弱符号

首先上代码和结果&#xff1a; 代码&#xff1a; #include <stdio.h> int k; int k; int main() {printf("addr of k %p\n", &k);printf("value of k %d\n", k);return 0; }结果&#xff1a; addr of k 00408074 value of k 0问题&…...

AD/DA转换(XPT2046)

AD/DA介绍AD&#xff08;Analog to Digital&#xff09;&#xff1a;模拟-数字转换&#xff0c;将模拟信号转换为计算机可操作的数字信号DA&#xff08;Digital to Analog&#xff09;&#xff1a;数字-模拟转换&#xff0c;将计算机输出的数字信号转换为模拟信号AD/DA转换打开…...

乐观锁和悲观锁 面试题

Mysql的乐观锁和悲观锁 实现方式加锁时机常见的调用方式优势不足适用场景乐观锁开发自定义更新数据的时候sql语句中进行version的判断高并发容易出现不一致的问题高并发读&#xff0c;少写悲观锁Mysql内置查询数据的开始select * for update保证一致性低并发互联网高并发场景极…...

【Autoware规控】mpc_follower模型预测控制节点

文章目录1. 技术原理2. 代码实现1. 技术原理 MPC&#xff0c;即Model Predictive Control&#xff08;模型预测控制&#xff09;&#xff0c;是一种基于动态模型的控制算法。MPC算法通过建立系统的数学模型&#xff0c;根据当前状态和一定时间内的预测&#xff0c;优化未来的控…...

成果VR虚拟3D展厅让内容更丰富饱满

随着数字技术的不断发展和普及&#xff0c;数字化展厅成为了一种重要的展示形式。线上虚拟展厅作为数字化展示的一种新形式&#xff0c;采用虚拟现实技术&#xff0c;能够克服时空限制&#xff0c;打破传统展览业的展示模式&#xff0c;为用户提供更加丰富、立体、沉浸式的展览…...

【CE进阶】lua脚本使用

▒ 目录 ▒&#x1f6eb; 导读需求开发环境1️⃣ 脚本窗口Lua ScriptLua EngineAuto assemble2️⃣ 全局变量3️⃣ 进程当前打开的进程ID系统的进程列表系统的顶部窗口列表4️⃣ 线程5️⃣ 输入设备6️⃣ 屏幕7️⃣ 剪贴板&#x1f6ec; 文章小结&#x1f4d6; 参考资料&#x…...

【vue2】近期bug收集与整理02

⭐【前言】 在使用vue2构建页面时候&#xff0c;博主遇到的问题难点以及最终的解决方案。 &#x1f973;博主&#xff1a;初映CY的前说(前端领域) &#x1f918;本文核心&#xff1a;博主遇到的问题与解决思路 ⭐数据枚举文件的使用 同后端那边发送请求的时&#xff0c;请求返…...

2. 01背包问题

文章目录QuestionIdeasCodeQuestion 有 N 件物品和一个容量是 V 的背包。每件物品只能使用一次。 第 i 件物品的体积是 vi &#xff0c;价值是 wi 。 求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不超过背包容量&#xff0c;且总价值最大。 输出最大价值。 输入…...

【Docker】CAdvisor+InfluxDB+Granfana容器监控

文章目录原生命令 docker stats容器监控3剑客CIGCAdvisorInfluxDBGranfanacompose容器编排&#xff0c;一套带走新建目录新建3件套组合的 docker-compose.yml检查配置&#xff0c;有问题才有输出 docker-compose config -q启动docker-compose文件 docker-compose up -d测试浏览…...

k8s 部署nginx 实现集群统一配置,自动更新nginx.conf配置文件 总结

k8s 部署nginx 实现集群统一配置&#xff0c;自动更新nginx.conf配置文件 总结 大纲 1 nginx镜像选择2 创建configmap保存nginx配置文件3 使用inotify监控配置文件变化4 Dockerfile创建5 调整镜像原地址使用阿里云6 创建deploy部署文件部署nginx7 测试使用nginx配置文件同步&…...

动态内存管理(上)——“C”

各位CSDN的uu们你们好呀&#xff0c;今天&#xff0c;小雅兰的内容是动态内存管理噢&#xff0c;下面&#xff0c;让我们进入动态内存管理的世界吧 为什么存在动态内存分配 动态内存函数的介绍 malloc free calloc realloc 常见的动态内存错误 为什么存在动态内存分配 我们已…...

GPT-4发布,这类人才告急,大厂月薪10W+疯抢

ChatGPT最近彻底火出圈&#xff0c;各行各业都在争相报道&#xff0c;甚至连很多官媒都下场“跟风”。ChatGPT的瓜还没吃完&#xff0c;平地一声雷&#xff0c;GPT-4又重磅发布&#xff01; 很多小伙伴瑟瑟发抖&#xff1a;“AI会不会跟自己抢饭碗啊&#xff1f;” 关于“如何…...

MySQL数据库实现主主同步

前言 MySQL主主同步实际上是在主从同步的基础上将从数据库也提升成主数据库&#xff0c;让它们可以互相读写数据库&#xff0c;从数据库变成主数据库&#xff1b;主从相互授权连接&#xff0c;读取对方binlog日志并更新到本地数据库的过程,只要对方数据改变&#xff0c;自己就…...