【Linux】基于阻塞队列的生产者消费者模型
🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
目录
- 👉为何要使用生产者消费者模型👈
- 👉生产者消费者模型的优点👈
- 👉基于阻塞队列的生产者和消费者模型👈
- 👉总结👈
👉为何要使用生产者消费者模型👈
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型在生活中是相当常见的,比如客户去超市里买商品就是一个很好的例子。在这个例子中,供货商就是生产者,顾客就是消费者,超市就是一个交易场所,本质是一个商品的缓冲区。超市存在的意义就是让生产者和消费者解耦,以提高效率。
在生产者消费者模型中,有一个交易场所超市,生产者和消费者两种角色,生产者和生产者、消费者和消费者、生产者和消费者三种关系。其中生产者和生产者之间的关系是竞争互斥关系,消费者和消费者之间的关系是竞争互斥关系,生产者和消费者之间的关系是互斥和同步关系。当生产者进行生产时,消费者不能进行消费(保证安全),所以生产者和消费者是互斥关系。当商品很少时,生产者需要进行生产,消费者需要进行等待;而当商品很多时,生产者不能进行生产,消费者进行消费,所以生产行为和消费行为具有一定的顺序性,也就是说生产者和消费者是同步关系。
在计算机世界里,生产者和消费者都是通过线程模拟出来的。当生产者线程生产完商品后,就可以通知消费者线程来进行消费;而当消费者线程消费完商品后,就可以通知生产者线程来进行生产,这个过程就是通过条件变量来实现的。
👉生产者消费者模型的优点👈
想要真正理解生产者消费者模型优点,我们需要知道生产者生产数据和消费者消费数据都是想要消耗时间的。那么当消费者线程进行消费数据时,该线程是不会访问阻塞队列的。所以当消费者线程进行消费时,生产者线程从其他地方获取数据并将该数据放入到阻塞队列中,这样就可以提高生产者线程和消费者线程的并发度了。所以生产者消费者模型具有一下优点:解耦、支持并发。
👉基于阻塞队列的生产者和消费者模型👈
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。注:管道本身就是一个阻塞队列,有数据就消费,没数据就等待。管道内部已经实现了互斥和同步的功能。
单生产者单消费者模型
// BlockQueue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>const int DefaultCap = 5;template <class T>
class BlockQueue
{
private:bool isEmpty(){return _bq.empty();}bool isFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = DefaultCap): _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_empty, nullptr);pthread_cond_init(&_full, nullptr);}// 生产者void push(const T& in){// 访问临界资源需要进行加锁保护pthread_mutex_lock(&_mtx);// 1.先检查当前的临界资源是否满足访问条件// 检查临界资源是否满足访问条件,也是访问临界资源// 所以它也需要在加锁和解锁之间// pthread_cond_wait函数是让线程在特定的条件变量下阻塞等待// 当进行等待时,线程所持有的锁会被自动释放掉.当条件变量满足// 时,线程会被在阻塞挂起的地方被唤醒.线程被唤醒的时候,是在临// 界区中的,pthread_cond_wait会自动帮助线程获取锁while(isFull())pthread_cond_wait(&_full, &_mtx);// pthread_cond_wait是一个函数,它就可能会调用失败,从而出现// 伪唤醒(临界资源不满足访问条件,却往下进行临界资源的访问)的// 情况,所以需要通过while来保证满足访问临界资源的条件,而不能// 通过if来判断临界资源是否能被访问// 2.访问临界资源(来到这里,临界资源100%是就绪的!)_bq.push(in);// 可以指定特定的唤醒线程的策略,如数据的个数大于容量的一半// if(2 * _bq.size() >= _capacity) pthread_cond_signal(&_empty);// 唤醒线程可以在释放锁之前,也可以在释放锁之后pthread_cond_signal(&_empty);pthread_mutex_unlock(&_mtx);}// 消费者void pop(T* out){pthread_mutex_lock(&_mtx);while(isEmpty())pthread_cond_wait(&_empty, &_mtx);*out = _bq.front();_bq.pop();pthread_cond_signal(&_full);pthread_mutex_unlock(&_mtx);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_empty);pthread_cond_destroy(&_full);}private:std::queue<T> _bq; // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)int _capacity; // 容量上线pthread_mutex_t _mtx; // 互斥锁保护队列的安全pthread_cond_t _empty; // 该条件变量表示bq是否为空pthread_cond_t _full; // 该条件变量表示bq是否为满
};// ConProd.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>void* consume(void* args)
{BlockQueue<int>* bq = (BlockQueue<int>*)args;while(1){int a;bq->pop(&a);std::cout << "消费了一个数据:" << a << std::endl;sleep(1);}return nullptr;
}void* produce(void* args)
{BlockQueue<int>* bq = (BlockQueue<int>*)args;int a = 1;while(1){bq->push(a);std::cout << "生产了一个数据:" << a << std::endl;++a;}return nullptr;
}int main()
{BlockQueue<int>* bq = new BlockQueue<int>();pthread_t c, p;pthread_create(&c, nullptr, consume, (void*)bq);pthread_create(&p, nullptr, produce, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

增加任务类
// Task.hpp
#pragma once#include <iostream>
#include <functional>typedef std::function<int(int, int)> func_t;class Task
{
public:Task() = default;Task(int x, int y, func_t func): _x(x), _y(y), _func(func){}int operator()(){return _func(_x, _y);}public:int _x;int _y;func_t _func;
};// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>int Add(int x, int y)
{return x + y;
}void* consume(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 获取任务Task t;bq->pop(&t);// 完成任务std::cout << "consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;}return nullptr;
}void* produce(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 制作任务 -- 不一定从生产者来的,可能是从网络来的int x = rand() % 10 + 1;int y = rand() % 20 + 1;// 生产任务Task t(x, y, Add);bq->push(t);std::cout << "productor: " << x << " + " << y << " = ?" << std::endl; sleep(1);}return nullptr;
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);BlockQueue<Task>* bq = new BlockQueue<Task>();pthread_t c, p;pthread_create(&c, nullptr, consume, (void*)bq);pthread_create(&p, nullptr, produce, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

多生产者多消费者
// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>int Add(int x, int y)
{return x + y;
}void* consume(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 获取任务Task t;bq->pop(&t);// 完成任务std::cout << pthread_self() << " consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;}return nullptr;
}void* produce(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 制作任务 -- 不一定从生产者来的,可能是从网络来的int x = rand() % 10 + 1;int y = rand() % 20 + 1;// int x, y;// std::cout << "Please Enter x: ";// std::cin >> x;// std::cout << "Please Enter y: ";// std::cin >> y;// 生产任务Task t(x, y, Add);bq->push(t);std::cout << pthread_self() << " productor: " << x << " + " << y << " = ?" << std::endl; sleep(1);}return nullptr;
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);BlockQueue<Task>* bq = new BlockQueue<Task>();// pthread_t c, p;pthread_t c[2], p[2];pthread_create(c, nullptr, consume, (void*)bq);pthread_create(c + 1, nullptr, consume, (void*)bq);pthread_create(p, nullptr, produce, (void*)bq);pthread_create(p + 1, nullptr, produce, (void*)bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bq;return 0;
}
多生产者多消费者模型的意义就是让生产者并发地获取和制作任务,让消费者并发地完成消费任务。多生产者多消费者模型主要用于处理消费任务或者获取和制作任务比较耗时的场景。
锁的封装
// LockGuard.hpp
#pragma once#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t* pmtx): _pmtx(pmtx){}void lock(){std::cout << "进行加锁" << std::endl;pthread_mutex_lock(_pmtx);}void unlock(){std::cout << "进行解锁" << std::endl;pthread_mutex_unlock(_pmtx);}~Mutex(){}private:pthread_mutex_t* _pmtx;
};// RAII的加锁方式
class LockGuard
{
public:LockGuard(pthread_mutex_t* pmtx): _mtx(pmtx){_mtx.lock();}~LockGuard(){_mtx.unlock();}private:Mutex _mtx;
};// BlockQueue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"const int DefaultCap = 5;template <class T>
class BlockQueue
{
private:bool isEmpty(){return _bq.empty();}bool isFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = DefaultCap): _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_empty, nullptr);pthread_cond_init(&_full, nullptr);}// 生产者void push(const T &in){// 出了函数的作用域会自动解锁LockGuard lockGuard(&_mtx);while (isFull())pthread_cond_wait(&_full, &_mtx);_bq.push(in);pthread_cond_signal(&_empty);}// 消费者void pop(T *out){// 出了函数的作用域会自动解锁LockGuard lockGuard(&_mtx);while (isEmpty())pthread_cond_wait(&_empty, &_mtx);*out = _bq.front();_bq.pop();pthread_cond_signal(&_full);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_empty);pthread_cond_destroy(&_full);}private:std::queue<T> _bq; // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)int _capacity; // 容量上线pthread_mutex_t _mtx; // 互斥锁保护队列的安全pthread_cond_t _empty; // 该条件变量表示bq是否为空pthread_cond_t _full; // 该条件变量表示bq是否为满
};
👉总结👈
本篇博客主要讲解了为什么要使用生产者消费者模型、基于阻塞队列的生产者和消费者模型以及 RAII 的加锁方式等等。那么以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家!💖💝❣️
相关文章:
【Linux】基于阻塞队列的生产者消费者模型
🌠 作者:阿亮joy. 🎆专栏:《学会Linux》 🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根 目录👉为何要使用…...
【华为OD机试 2023最新 】 真正的密码(C++)
文章目录 题目描述输入描述输出描述用例题目解析C++题目描述 在一行中输入一个字符串数组,如果其中一个字符串的所有以索引0开头的子串在数组中都有,那么这个字符串就是潜在密码, 在所有潜在密码中最长的是真正的密码,如果有多个长度相同的真正的密码,那么取字典序最大的…...
差分算法(蓝桥杯复习+例题讲解+模板c++)
文章目录差分介绍差分应用区间加区间求和总结3729. 改变数组元素100. 增减序列文章首发于:My Blog 欢迎大佬们前来逛逛 差分介绍 差分是一种常见的算法,用于快速修改数组中某一段区间的值。 差分的思想就是预处理出数组的差分数组,然后修改…...
CSS+ JS 实现手电筒效果
前言概述 JavaScript 结合 CSS 打造的一款图片特效,当鼠标拖拽滑块时,让本该置灰的图片局部恢复本来的颜色。且该效果随着你的鼠标的按下时的移动而移动。 核心功能 图片置灰 拖拽功能 让滑块位置处的图片恢复本来的颜色 实现原理 这个的实现原理并不…...
2021地理设计组二等奖:基于InSAR和指数分析的地面沉降风
作品简介 一、作品背景 地面沉降是指地面高程的降低, 又称地面下沉或地沉, 是以缓慢、难以察觉的向下垂直运动为主, 是指在自然和人为因素作用下, 由于地壳表层土体压缩而导致区域性地面标高降低的一种环境现象。目前, 地面沉降己成为城市化进程中普遍存在的生态环境问题, 成为…...
计算机操作系统(第四版)第二章进程的描述与控制—课后习题答案
1.什么是前趋图?为什么要引入前趋图? 前趋图是一个有向无循环图,记为DAG,用于描述进程之间执行的先后关系。 2.试画出下面四条语句的前趋图: S1:axy; S2:bz1; S3:ca-b; S4:wc1; 3.为什么程序并发执行会产生间断性特征&…...
CAN通信----电路图
CAN通信----基本原理 一、CAN总线网络连接 1.闭环总线网络----ISO11898 闭环总线网络高速、短距离,它的总线最大长度为 40m,通信速度最高为 1Mbps,总线的两端各要求有一个120 欧的电阻。 2.开环总线网络----ISO11519 开环总线网络低速、…...
Windows系统安装ElasticSearch(一)
一 ES介绍Elasticsearch 是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene(TM) 基础上的搜索引擎.当然 Elasticsearch 并不仅仅是 Lucene 那么简单,它不仅包括了全文搜索功能,还可以进行以下工作:分布式实时文件存储&#…...
linux 产生随机数 并遍历
1、产生随机数 varRANDOMvarRANDOM varRANDOMvar[ $var % 150 ] 2、产生不重复的随机数 $ entries($(shuf -i 0-149 -n 15)) $ echo “${entries[]}” 3、对随机数排序 $ entries($(shuf -i 0-149 -n 15 | sort -n)) $ echo “entries[]"12224549546678798393118119124140…...
【3.24】Mybatis常见面试题
Mybatis常见面试题 #{}和¥{}的区别是什么? 【#】:底层执行SQL使用PreparedStatement对象,预编译SQL,相对安全。入参使用占位符的方式。 【$】:底层执行SQL使用Statement对象,入参使用SQL拼接的…...
IDEA 热部署,修改代码不用重启项目
热部署指在修改项目代码的时候不重启服务器让修改生效。安装JRebel and XRebelFile->Settings,然后Plugins-> Marketplace,输入JRebel,安装如下插件——JRebel and XRebel ,重启idea激活JRebel and XRebel第一行输入网址&am…...
将 XLS 转换为 EXE:xlCompiler Crack
只需单击几下即可将Excel文件转换为应用程序 xl编译器无需编程即可将您的Excel电子表格转换为软件应用程序 将 XLS 转换为 EXE 将Excel文件转换为具有保护选项的应用程序。Excel 到 EXE 转换器为您提供了分发 Excel 模型的竞争优势和灵活性。将 Excel 的功能丰富的环境保存在应…...
【百面成神】spring基础12问,你能坚持到第几问
前 言 🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端 ☕专栏简介:java面试宝典,特点:全、精、深、简,力求每个核心知识点1分钟回答好。 dz…...
javaSE类和对象(下)
目录君1.封装2.访问限定符3.包的定义及使用4.static成员变量5.static成员方法6.代码块及其分类实例代码块静态代码块静态代码块与实例代码块的执行顺序static成员变量(类变量)初始化1.封装 面向对象程序三大特性:封装、继承、多态。而类和对象阶段,主要…...
【数据结构】第四站:单链表力扣题(二)
目录 一、链表的回文结构 二、相交链表 三、环形链表 四、环形链表Ⅱ 五、复制带随机指针的链表 一、链表的回文结构 题目描述:链表的回文结构_牛客题霸_牛客网 对于这道题,如果没有前面的一些题的基础,是非常难做的,我们的思…...
KafKa知识汇总
前言 汇总相关知识 Kafka快速实战与基本原理详解...
【RV1126】调试GT911,1024x600 7寸 MIPI 电容触摸屏
文章目录一、驱动注册失败二、触摸屏可以触摸,但是x轴数据反了三、可以触摸了,但是Y轴数据跳变,几乎只有一半的屏幕是可以正常滑动的三、汇顶触摸屏配置文件解析四、使用新的配置文件4.1 新配置解决问题4.2 测试触摸的方法在kernel增加frame …...
C的强符号/弱符号
首先上代码和结果: 代码: #include <stdio.h> int k; int k; int main() {printf("addr of k %p\n", &k);printf("value of k %d\n", k);return 0; }结果: addr of k 00408074 value of k 0问题&…...
AD/DA转换(XPT2046)
AD/DA介绍AD(Analog to Digital):模拟-数字转换,将模拟信号转换为计算机可操作的数字信号DA(Digital to Analog):数字-模拟转换,将计算机输出的数字信号转换为模拟信号AD/DA转换打开…...
乐观锁和悲观锁 面试题
Mysql的乐观锁和悲观锁 实现方式加锁时机常见的调用方式优势不足适用场景乐观锁开发自定义更新数据的时候sql语句中进行version的判断高并发容易出现不一致的问题高并发读,少写悲观锁Mysql内置查询数据的开始select * for update保证一致性低并发互联网高并发场景极…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
Android Wi-Fi 连接失败日志分析
1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...
Spring Boot 实现流式响应(兼容 2.7.x)
在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...
macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用
文章目录 问题现象问题原因解决办法 问题现象 macOS启动台(Launchpad)多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显,都是Google家的办公全家桶。这些应用并不是通过独立安装的…...
生成 Git SSH 证书
🔑 1. 生成 SSH 密钥对 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" 参数说明: -t rsa&#x…...
【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
uniapp微信小程序视频实时流+pc端预览方案
方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度WebSocket图片帧定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐RTMP推流TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...
Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
android13 app的触摸问题定位分析流程
一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...
【Linux系统】Linux环境变量:系统配置的隐形指挥官
。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量:setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...


