当前位置: 首页 > news >正文

【Linux】基于阻塞队列的生产者消费者模型

​🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
在这里插入图片描述

目录

    • 👉为何要使用生产者消费者模型👈
    • 👉生产者消费者模型的优点👈
    • 👉基于阻塞队列的生产者和消费者模型👈
    • 👉总结👈

👉为何要使用生产者消费者模型👈

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型在生活中是相当常见的,比如客户去超市里买商品就是一个很好的例子。在这个例子中,供货商就是生产者,顾客就是消费者,超市就是一个交易场所,本质是一个商品的缓冲区。超市存在的意义就是让生产者和消费者解耦,以提高效率。


在生产者消费者模型中,有一个交易场所超市,生产者和消费者两种角色,生产者和生产者、消费者和消费者、生产者和消费者三种关系。其中生产者和生产者之间的关系是竞争互斥关系,消费者和消费者之间的关系是竞争互斥关系,生产者和消费者之间的关系是互斥和同步关系。当生产者进行生产时,消费者不能进行消费(保证安全),所以生产者和消费者是互斥关系。当商品很少时,生产者需要进行生产,消费者需要进行等待;而当商品很多时,生产者不能进行生产,消费者进行消费,所以生产行为和消费行为具有一定的顺序性,也就是说生产者和消费者是同步关系。

在计算机世界里,生产者和消费者都是通过线程模拟出来的。当生产者线程生产完商品后,就可以通知消费者线程来进行消费;而当消费者线程消费完商品后,就可以通知生产者线程来进行生产,这个过程就是通过条件变量来实现的。

👉生产者消费者模型的优点👈

想要真正理解生产者消费者模型优点,我们需要知道生产者生产数据和消费者消费数据都是想要消耗时间的。那么当消费者线程进行消费数据时,该线程是不会访问阻塞队列的。所以当消费者线程进行消费时,生产者线程从其他地方获取数据并将该数据放入到阻塞队列中,这样就可以提高生产者线程和消费者线程的并发度了。所以生产者消费者模型具有一下优点:解耦、支持并发。

👉基于阻塞队列的生产者和消费者模型👈

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。注:管道本身就是一个阻塞队列,有数据就消费,没数据就等待。管道内部已经实现了互斥和同步的功能。

在这里插入图片描述

单生产者单消费者模型

// BlockQueue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>const int DefaultCap = 5;template <class T>
class BlockQueue
{
private:bool isEmpty(){return _bq.empty();}bool isFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = DefaultCap): _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_empty, nullptr);pthread_cond_init(&_full, nullptr);}// 生产者void push(const T& in){// 访问临界资源需要进行加锁保护pthread_mutex_lock(&_mtx);// 1.先检查当前的临界资源是否满足访问条件// 检查临界资源是否满足访问条件,也是访问临界资源// 所以它也需要在加锁和解锁之间// pthread_cond_wait函数是让线程在特定的条件变量下阻塞等待// 当进行等待时,线程所持有的锁会被自动释放掉.当条件变量满足// 时,线程会被在阻塞挂起的地方被唤醒.线程被唤醒的时候,是在临// 界区中的,pthread_cond_wait会自动帮助线程获取锁while(isFull())pthread_cond_wait(&_full, &_mtx);// pthread_cond_wait是一个函数,它就可能会调用失败,从而出现// 伪唤醒(临界资源不满足访问条件,却往下进行临界资源的访问)的// 情况,所以需要通过while来保证满足访问临界资源的条件,而不能// 通过if来判断临界资源是否能被访问// 2.访问临界资源(来到这里,临界资源100%是就绪的!)_bq.push(in);// 可以指定特定的唤醒线程的策略,如数据的个数大于容量的一半// if(2 * _bq.size() >= _capacity) pthread_cond_signal(&_empty);// 唤醒线程可以在释放锁之前,也可以在释放锁之后pthread_cond_signal(&_empty);pthread_mutex_unlock(&_mtx);}// 消费者void pop(T* out){pthread_mutex_lock(&_mtx);while(isEmpty())pthread_cond_wait(&_empty, &_mtx);*out = _bq.front();_bq.pop();pthread_cond_signal(&_full);pthread_mutex_unlock(&_mtx);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_empty);pthread_cond_destroy(&_full);}private:std::queue<T> _bq;     // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)int _capacity;         // 容量上线pthread_mutex_t _mtx;  // 互斥锁保护队列的安全pthread_cond_t _empty; // 该条件变量表示bq是否为空pthread_cond_t _full;  // 该条件变量表示bq是否为满
};// ConProd.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>void* consume(void* args)
{BlockQueue<int>* bq = (BlockQueue<int>*)args;while(1){int a;bq->pop(&a);std::cout << "消费了一个数据:" << a << std::endl;sleep(1);}return nullptr;
}void* produce(void* args)
{BlockQueue<int>* bq = (BlockQueue<int>*)args;int a = 1;while(1){bq->push(a);std::cout << "生产了一个数据:" << a << std::endl;++a;}return nullptr;
}int main()
{BlockQueue<int>* bq = new BlockQueue<int>();pthread_t c, p;pthread_create(&c, nullptr, consume, (void*)bq);pthread_create(&p, nullptr, produce, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

在这里插入图片描述

增加任务类

// Task.hpp
#pragma once#include <iostream>
#include <functional>typedef std::function<int(int, int)> func_t;class Task
{
public:Task() = default;Task(int x, int y, func_t func): _x(x), _y(y), _func(func){}int operator()(){return _func(_x, _y);}public:int _x;int _y;func_t _func;
};// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>int Add(int x, int y)
{return x + y;
}void* consume(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 获取任务Task t;bq->pop(&t);// 完成任务std::cout << "consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;}return nullptr;
}void* produce(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 制作任务 -- 不一定从生产者来的,可能是从网络来的int x = rand() % 10 + 1;int y = rand() % 20 + 1;// 生产任务Task t(x, y, Add);bq->push(t);std::cout << "productor: " << x << " + " << y << " = ?" << std::endl; sleep(1);}return nullptr;
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);BlockQueue<Task>* bq = new BlockQueue<Task>();pthread_t c, p;pthread_create(&c, nullptr, consume, (void*)bq);pthread_create(&p, nullptr, produce, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

在这里插入图片描述

多生产者多消费者

// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>int Add(int x, int y)
{return x + y;
}void* consume(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 获取任务Task t;bq->pop(&t);// 完成任务std::cout << pthread_self() << " consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;}return nullptr;
}void* produce(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 制作任务 -- 不一定从生产者来的,可能是从网络来的int x = rand() % 10 + 1;int y = rand() % 20 + 1;// int x, y;// std::cout << "Please Enter x: ";// std::cin >> x;// std::cout << "Please Enter y: ";// std::cin >> y;// 生产任务Task t(x, y, Add);bq->push(t);std::cout << pthread_self() << " productor: " << x << " + " << y << " = ?" << std::endl; sleep(1);}return nullptr;
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);BlockQueue<Task>* bq = new BlockQueue<Task>();// pthread_t c, p;pthread_t c[2], p[2];pthread_create(c, nullptr, consume, (void*)bq);pthread_create(c + 1, nullptr, consume, (void*)bq);pthread_create(p, nullptr, produce, (void*)bq);pthread_create(p + 1, nullptr, produce, (void*)bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bq;return 0;
}

多生产者多消费者模型的意义就是让生产者并发地获取和制作任务,让消费者并发地完成消费任务。多生产者多消费者模型主要用于处理消费任务或者获取和制作任务比较耗时的场景。

锁的封装

// LockGuard.hpp
#pragma once#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t* pmtx): _pmtx(pmtx){}void lock(){std::cout << "进行加锁" << std::endl;pthread_mutex_lock(_pmtx);}void unlock(){std::cout << "进行解锁" << std::endl;pthread_mutex_unlock(_pmtx);}~Mutex(){}private:pthread_mutex_t* _pmtx;
};// RAII的加锁方式
class LockGuard
{
public:LockGuard(pthread_mutex_t* pmtx): _mtx(pmtx){_mtx.lock();}~LockGuard(){_mtx.unlock();}private:Mutex _mtx;
};// BlockQueue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"const int DefaultCap = 5;template <class T>
class BlockQueue
{
private:bool isEmpty(){return _bq.empty();}bool isFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = DefaultCap): _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_empty, nullptr);pthread_cond_init(&_full, nullptr);}// 生产者void push(const T &in){// 出了函数的作用域会自动解锁LockGuard lockGuard(&_mtx);while (isFull())pthread_cond_wait(&_full, &_mtx);_bq.push(in);pthread_cond_signal(&_empty);}// 消费者void pop(T *out){// 出了函数的作用域会自动解锁LockGuard lockGuard(&_mtx);while (isEmpty())pthread_cond_wait(&_empty, &_mtx);*out = _bq.front();_bq.pop();pthread_cond_signal(&_full);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_empty);pthread_cond_destroy(&_full);}private:std::queue<T> _bq;     // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)int _capacity;         // 容量上线pthread_mutex_t _mtx;  // 互斥锁保护队列的安全pthread_cond_t _empty; // 该条件变量表示bq是否为空pthread_cond_t _full;  // 该条件变量表示bq是否为满
};

👉总结👈

本篇博客主要讲解了为什么要使用生产者消费者模型、基于阻塞队列的生产者和消费者模型以及 RAII 的加锁方式等等。那么以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家!💖💝❣️

相关文章:

【Linux】基于阻塞队列的生产者消费者模型

​&#x1f320; 作者&#xff1a;阿亮joy. &#x1f386;专栏&#xff1a;《学会Linux》 &#x1f387; 座右铭&#xff1a;每个优秀的人都有一段沉默的时光&#xff0c;那段时光是付出了很多努力却得不到结果的日子&#xff0c;我们把它叫做扎根 目录&#x1f449;为何要使用…...

【华为OD机试 2023最新 】 真正的密码(C++)

文章目录 题目描述输入描述输出描述用例题目解析C++题目描述 在一行中输入一个字符串数组,如果其中一个字符串的所有以索引0开头的子串在数组中都有,那么这个字符串就是潜在密码, 在所有潜在密码中最长的是真正的密码,如果有多个长度相同的真正的密码,那么取字典序最大的…...

差分算法(蓝桥杯复习+例题讲解+模板c++)

文章目录差分介绍差分应用区间加区间求和总结3729. 改变数组元素100. 增减序列文章首发于&#xff1a;My Blog 欢迎大佬们前来逛逛 差分介绍 差分是一种常见的算法&#xff0c;用于快速修改数组中某一段区间的值。 差分的思想就是预处理出数组的差分数组&#xff0c;然后修改…...

CSS+ JS 实现手电筒效果

前言概述 JavaScript 结合 CSS 打造的一款图片特效&#xff0c;当鼠标拖拽滑块时&#xff0c;让本该置灰的图片局部恢复本来的颜色。且该效果随着你的鼠标的按下时的移动而移动。 核心功能 图片置灰 拖拽功能 让滑块位置处的图片恢复本来的颜色 实现原理 这个的实现原理并不…...

2021地理设计组二等奖:基于InSAR和指数分析的地面沉降风

作品简介 一、作品背景 地面沉降是指地面高程的降低, 又称地面下沉或地沉, 是以缓慢、难以察觉的向下垂直运动为主, 是指在自然和人为因素作用下, 由于地壳表层土体压缩而导致区域性地面标高降低的一种环境现象。目前, 地面沉降己成为城市化进程中普遍存在的生态环境问题, 成为…...

计算机操作系统(第四版)第二章进程的描述与控制—课后习题答案

1.什么是前趋图&#xff1f;为什么要引入前趋图&#xff1f; 前趋图是一个有向无循环图&#xff0c;记为DAG&#xff0c;用于描述进程之间执行的先后关系。 2.试画出下面四条语句的前趋图&#xff1a; S1:axy; S2:bz1; S3:ca-b; S4:wc1; 3.为什么程序并发执行会产生间断性特征&…...

CAN通信----电路图

CAN通信----基本原理 一、CAN总线网络连接 1.闭环总线网络----ISO11898 闭环总线网络高速、短距离&#xff0c;它的总线最大长度为 40m&#xff0c;通信速度最高为 1Mbps&#xff0c;总线的两端各要求有一个120 欧的电阻。 2.开环总线网络----ISO11519 开环总线网络低速、…...

Windows系统安装ElasticSearch(一)

一 ES介绍Elasticsearch 是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene(TM) 基础上的搜索引擎.当然 Elasticsearch 并不仅仅是 Lucene 那么简单&#xff0c;它不仅包括了全文搜索功能&#xff0c;还可以进行以下工作:分布式实时文件存储&#…...

linux 产生随机数 并遍历

1、产生随机数 varRANDOMvarRANDOM varRANDOMvar[ $var % 150 ] 2、产生不重复的随机数 $ entries($(shuf -i 0-149 -n 15)) $ echo “${entries[]}” 3、对随机数排序 $ entries($(shuf -i 0-149 -n 15 | sort -n)) $ echo “entries[]"12224549546678798393118119124140…...

【3.24】Mybatis常见面试题

Mybatis常见面试题 #{}和&#xffe5;{}的区别是什么&#xff1f; 【#】&#xff1a;底层执行SQL使用PreparedStatement对象&#xff0c;预编译SQL&#xff0c;相对安全。入参使用占位符的方式。 【$】&#xff1a;底层执行SQL使用Statement对象&#xff0c;入参使用SQL拼接的…...

IDEA 热部署,修改代码不用重启项目

热部署指在修改项目代码的时候不重启服务器让修改生效。安装JRebel and XRebelFile->Settings&#xff0c;然后Plugins-> Marketplace&#xff0c;输入JRebel&#xff0c;安装如下插件——JRebel and XRebel &#xff0c;重启idea激活JRebel and XRebel第一行输入网址&am…...

将 XLS 转换为 EXE:xlCompiler Crack

只需单击几下即可将Excel文件转换为应用程序 xl编译器无需编程即可将您的Excel电子表格转换为软件应用程序 将 XLS 转换为 EXE 将Excel文件转换为具有保护选项的应用程序。Excel 到 EXE 转换器为您提供了分发 Excel 模型的竞争优势和灵活性。将 Excel 的功能丰富的环境保存在应…...

【百面成神】spring基础12问,你能坚持到第几问

前 言 &#x1f349; 作者简介&#xff1a;半旧518&#xff0c;长跑型选手&#xff0c;立志坚持写10年博客&#xff0c;专注于java后端 ☕专栏简介&#xff1a;java面试宝典&#xff0c;特点&#xff1a;全、精、深、简&#xff0c;力求每个核心知识点1分钟回答好。 &#x1f3…...

javaSE类和对象(下)

目录君1.封装2.访问限定符3.包的定义及使用4.static成员变量5.static成员方法6.代码块及其分类实例代码块静态代码块静态代码块与实例代码块的执行顺序static成员变量(类变量)初始化1.封装 面向对象程序三大特性&#xff1a;封装、继承、多态。而类和对象阶段&#xff0c;主要…...

【数据结构】第四站:单链表力扣题(二)

目录 一、链表的回文结构 二、相交链表 三、环形链表 四、环形链表Ⅱ 五、复制带随机指针的链表 一、链表的回文结构 题目描述&#xff1a;链表的回文结构_牛客题霸_牛客网 对于这道题&#xff0c;如果没有前面的一些题的基础&#xff0c;是非常难做的&#xff0c;我们的思…...

KafKa知识汇总

前言 汇总相关知识 Kafka快速实战与基本原理详解...

【RV1126】调试GT911,1024x600 7寸 MIPI 电容触摸屏

文章目录一、驱动注册失败二、触摸屏可以触摸&#xff0c;但是x轴数据反了三、可以触摸了&#xff0c;但是Y轴数据跳变&#xff0c;几乎只有一半的屏幕是可以正常滑动的三、汇顶触摸屏配置文件解析四、使用新的配置文件4.1 新配置解决问题4.2 测试触摸的方法在kernel增加frame …...

C的强符号/弱符号

首先上代码和结果&#xff1a; 代码&#xff1a; #include <stdio.h> int k; int k; int main() {printf("addr of k %p\n", &k);printf("value of k %d\n", k);return 0; }结果&#xff1a; addr of k 00408074 value of k 0问题&…...

AD/DA转换(XPT2046)

AD/DA介绍AD&#xff08;Analog to Digital&#xff09;&#xff1a;模拟-数字转换&#xff0c;将模拟信号转换为计算机可操作的数字信号DA&#xff08;Digital to Analog&#xff09;&#xff1a;数字-模拟转换&#xff0c;将计算机输出的数字信号转换为模拟信号AD/DA转换打开…...

乐观锁和悲观锁 面试题

Mysql的乐观锁和悲观锁 实现方式加锁时机常见的调用方式优势不足适用场景乐观锁开发自定义更新数据的时候sql语句中进行version的判断高并发容易出现不一致的问题高并发读&#xff0c;少写悲观锁Mysql内置查询数据的开始select * for update保证一致性低并发互联网高并发场景极…...

conda相比python好处

Conda 作为 Python 的环境和包管理工具&#xff0c;相比原生 Python 生态&#xff08;如 pip 虚拟环境&#xff09;有许多独特优势&#xff0c;尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处&#xff1a; 一、一站式环境管理&#xff1a…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

【网络安全】开源系统getshell漏洞挖掘

审计过程&#xff1a; 在入口文件admin/index.php中&#xff1a; 用户可以通过m,c,a等参数控制加载的文件和方法&#xff0c;在app/system/entrance.php中存在重点代码&#xff1a; 当M_TYPE system并且M_MODULE include时&#xff0c;会设置常量PATH_OWN_FILE为PATH_APP.M_T…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

【LeetCode】算法详解#6 ---除自身以外数组的乘积

1.题目介绍 给定一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O…...

Linux部署私有文件管理系统MinIO

最近需要用到一个文件管理服务&#xff0c;但是又不想花钱&#xff0c;所以就想着自己搭建一个&#xff0c;刚好我们用的一个开源框架已经集成了MinIO&#xff0c;所以就选了这个 我这边对文件服务性能要求不是太高&#xff0c;单机版就可以 安装非常简单&#xff0c;几个命令就…...

在Spring Boot中集成RabbitMQ的完整指南

前言 在现代微服务架构中&#xff0c;消息队列&#xff08;Message Queue&#xff09;是实现异步通信、解耦系统组件的重要工具。RabbitMQ 是一个流行的消息中间件&#xff0c;支持多种消息协议&#xff0c;具有高可靠性和可扩展性。 本博客将详细介绍如何在 Spring Boot 项目…...

HTML版英语学习系统

HTML版英语学习系统 这是一个完全免费、无需安装、功能完整的英语学习工具&#xff0c;使用HTML CSS JavaScript实现。 功能 文本朗读练习 - 输入英文文章&#xff0c;系统朗读帮助练习听力和发音&#xff0c;适合跟读练习&#xff0c;模仿学习&#xff1b;实时词典查询 - 双…...