当前位置: 首页 > news >正文

【Linux】基于阻塞队列的生产者消费者模型

​🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
在这里插入图片描述

目录

    • 👉为何要使用生产者消费者模型👈
    • 👉生产者消费者模型的优点👈
    • 👉基于阻塞队列的生产者和消费者模型👈
    • 👉总结👈

👉为何要使用生产者消费者模型👈

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型在生活中是相当常见的,比如客户去超市里买商品就是一个很好的例子。在这个例子中,供货商就是生产者,顾客就是消费者,超市就是一个交易场所,本质是一个商品的缓冲区。超市存在的意义就是让生产者和消费者解耦,以提高效率。


在生产者消费者模型中,有一个交易场所超市,生产者和消费者两种角色,生产者和生产者、消费者和消费者、生产者和消费者三种关系。其中生产者和生产者之间的关系是竞争互斥关系,消费者和消费者之间的关系是竞争互斥关系,生产者和消费者之间的关系是互斥和同步关系。当生产者进行生产时,消费者不能进行消费(保证安全),所以生产者和消费者是互斥关系。当商品很少时,生产者需要进行生产,消费者需要进行等待;而当商品很多时,生产者不能进行生产,消费者进行消费,所以生产行为和消费行为具有一定的顺序性,也就是说生产者和消费者是同步关系。

在计算机世界里,生产者和消费者都是通过线程模拟出来的。当生产者线程生产完商品后,就可以通知消费者线程来进行消费;而当消费者线程消费完商品后,就可以通知生产者线程来进行生产,这个过程就是通过条件变量来实现的。

👉生产者消费者模型的优点👈

想要真正理解生产者消费者模型优点,我们需要知道生产者生产数据和消费者消费数据都是想要消耗时间的。那么当消费者线程进行消费数据时,该线程是不会访问阻塞队列的。所以当消费者线程进行消费时,生产者线程从其他地方获取数据并将该数据放入到阻塞队列中,这样就可以提高生产者线程和消费者线程的并发度了。所以生产者消费者模型具有一下优点:解耦、支持并发。

👉基于阻塞队列的生产者和消费者模型👈

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。注:管道本身就是一个阻塞队列,有数据就消费,没数据就等待。管道内部已经实现了互斥和同步的功能。

在这里插入图片描述

单生产者单消费者模型

// BlockQueue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>const int DefaultCap = 5;template <class T>
class BlockQueue
{
private:bool isEmpty(){return _bq.empty();}bool isFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = DefaultCap): _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_empty, nullptr);pthread_cond_init(&_full, nullptr);}// 生产者void push(const T& in){// 访问临界资源需要进行加锁保护pthread_mutex_lock(&_mtx);// 1.先检查当前的临界资源是否满足访问条件// 检查临界资源是否满足访问条件,也是访问临界资源// 所以它也需要在加锁和解锁之间// pthread_cond_wait函数是让线程在特定的条件变量下阻塞等待// 当进行等待时,线程所持有的锁会被自动释放掉.当条件变量满足// 时,线程会被在阻塞挂起的地方被唤醒.线程被唤醒的时候,是在临// 界区中的,pthread_cond_wait会自动帮助线程获取锁while(isFull())pthread_cond_wait(&_full, &_mtx);// pthread_cond_wait是一个函数,它就可能会调用失败,从而出现// 伪唤醒(临界资源不满足访问条件,却往下进行临界资源的访问)的// 情况,所以需要通过while来保证满足访问临界资源的条件,而不能// 通过if来判断临界资源是否能被访问// 2.访问临界资源(来到这里,临界资源100%是就绪的!)_bq.push(in);// 可以指定特定的唤醒线程的策略,如数据的个数大于容量的一半// if(2 * _bq.size() >= _capacity) pthread_cond_signal(&_empty);// 唤醒线程可以在释放锁之前,也可以在释放锁之后pthread_cond_signal(&_empty);pthread_mutex_unlock(&_mtx);}// 消费者void pop(T* out){pthread_mutex_lock(&_mtx);while(isEmpty())pthread_cond_wait(&_empty, &_mtx);*out = _bq.front();_bq.pop();pthread_cond_signal(&_full);pthread_mutex_unlock(&_mtx);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_empty);pthread_cond_destroy(&_full);}private:std::queue<T> _bq;     // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)int _capacity;         // 容量上线pthread_mutex_t _mtx;  // 互斥锁保护队列的安全pthread_cond_t _empty; // 该条件变量表示bq是否为空pthread_cond_t _full;  // 该条件变量表示bq是否为满
};// ConProd.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>void* consume(void* args)
{BlockQueue<int>* bq = (BlockQueue<int>*)args;while(1){int a;bq->pop(&a);std::cout << "消费了一个数据:" << a << std::endl;sleep(1);}return nullptr;
}void* produce(void* args)
{BlockQueue<int>* bq = (BlockQueue<int>*)args;int a = 1;while(1){bq->push(a);std::cout << "生产了一个数据:" << a << std::endl;++a;}return nullptr;
}int main()
{BlockQueue<int>* bq = new BlockQueue<int>();pthread_t c, p;pthread_create(&c, nullptr, consume, (void*)bq);pthread_create(&p, nullptr, produce, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

在这里插入图片描述

增加任务类

// Task.hpp
#pragma once#include <iostream>
#include <functional>typedef std::function<int(int, int)> func_t;class Task
{
public:Task() = default;Task(int x, int y, func_t func): _x(x), _y(y), _func(func){}int operator()(){return _func(_x, _y);}public:int _x;int _y;func_t _func;
};// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>int Add(int x, int y)
{return x + y;
}void* consume(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 获取任务Task t;bq->pop(&t);// 完成任务std::cout << "consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;}return nullptr;
}void* produce(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 制作任务 -- 不一定从生产者来的,可能是从网络来的int x = rand() % 10 + 1;int y = rand() % 20 + 1;// 生产任务Task t(x, y, Add);bq->push(t);std::cout << "productor: " << x << " + " << y << " = ?" << std::endl; sleep(1);}return nullptr;
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);BlockQueue<Task>* bq = new BlockQueue<Task>();pthread_t c, p;pthread_create(&c, nullptr, consume, (void*)bq);pthread_create(&p, nullptr, produce, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

在这里插入图片描述

多生产者多消费者

// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>int Add(int x, int y)
{return x + y;
}void* consume(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 获取任务Task t;bq->pop(&t);// 完成任务std::cout << pthread_self() << " consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;}return nullptr;
}void* produce(void* args)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)args;while(1){// 制作任务 -- 不一定从生产者来的,可能是从网络来的int x = rand() % 10 + 1;int y = rand() % 20 + 1;// int x, y;// std::cout << "Please Enter x: ";// std::cin >> x;// std::cout << "Please Enter y: ";// std::cin >> y;// 生产任务Task t(x, y, Add);bq->push(t);std::cout << pthread_self() << " productor: " << x << " + " << y << " = ?" << std::endl; sleep(1);}return nullptr;
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);BlockQueue<Task>* bq = new BlockQueue<Task>();// pthread_t c, p;pthread_t c[2], p[2];pthread_create(c, nullptr, consume, (void*)bq);pthread_create(c + 1, nullptr, consume, (void*)bq);pthread_create(p, nullptr, produce, (void*)bq);pthread_create(p + 1, nullptr, produce, (void*)bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bq;return 0;
}

多生产者多消费者模型的意义就是让生产者并发地获取和制作任务,让消费者并发地完成消费任务。多生产者多消费者模型主要用于处理消费任务或者获取和制作任务比较耗时的场景。

锁的封装

// LockGuard.hpp
#pragma once#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t* pmtx): _pmtx(pmtx){}void lock(){std::cout << "进行加锁" << std::endl;pthread_mutex_lock(_pmtx);}void unlock(){std::cout << "进行解锁" << std::endl;pthread_mutex_unlock(_pmtx);}~Mutex(){}private:pthread_mutex_t* _pmtx;
};// RAII的加锁方式
class LockGuard
{
public:LockGuard(pthread_mutex_t* pmtx): _mtx(pmtx){_mtx.lock();}~LockGuard(){_mtx.unlock();}private:Mutex _mtx;
};// BlockQueue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"const int DefaultCap = 5;template <class T>
class BlockQueue
{
private:bool isEmpty(){return _bq.empty();}bool isFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = DefaultCap): _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_empty, nullptr);pthread_cond_init(&_full, nullptr);}// 生产者void push(const T &in){// 出了函数的作用域会自动解锁LockGuard lockGuard(&_mtx);while (isFull())pthread_cond_wait(&_full, &_mtx);_bq.push(in);pthread_cond_signal(&_empty);}// 消费者void pop(T *out){// 出了函数的作用域会自动解锁LockGuard lockGuard(&_mtx);while (isEmpty())pthread_cond_wait(&_empty, &_mtx);*out = _bq.front();_bq.pop();pthread_cond_signal(&_full);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_empty);pthread_cond_destroy(&_full);}private:std::queue<T> _bq;     // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)int _capacity;         // 容量上线pthread_mutex_t _mtx;  // 互斥锁保护队列的安全pthread_cond_t _empty; // 该条件变量表示bq是否为空pthread_cond_t _full;  // 该条件变量表示bq是否为满
};

👉总结👈

本篇博客主要讲解了为什么要使用生产者消费者模型、基于阻塞队列的生产者和消费者模型以及 RAII 的加锁方式等等。那么以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家!💖💝❣️

相关文章:

【Linux】基于阻塞队列的生产者消费者模型

​&#x1f320; 作者&#xff1a;阿亮joy. &#x1f386;专栏&#xff1a;《学会Linux》 &#x1f387; 座右铭&#xff1a;每个优秀的人都有一段沉默的时光&#xff0c;那段时光是付出了很多努力却得不到结果的日子&#xff0c;我们把它叫做扎根 目录&#x1f449;为何要使用…...

【华为OD机试 2023最新 】 真正的密码(C++)

文章目录 题目描述输入描述输出描述用例题目解析C++题目描述 在一行中输入一个字符串数组,如果其中一个字符串的所有以索引0开头的子串在数组中都有,那么这个字符串就是潜在密码, 在所有潜在密码中最长的是真正的密码,如果有多个长度相同的真正的密码,那么取字典序最大的…...

差分算法(蓝桥杯复习+例题讲解+模板c++)

文章目录差分介绍差分应用区间加区间求和总结3729. 改变数组元素100. 增减序列文章首发于&#xff1a;My Blog 欢迎大佬们前来逛逛 差分介绍 差分是一种常见的算法&#xff0c;用于快速修改数组中某一段区间的值。 差分的思想就是预处理出数组的差分数组&#xff0c;然后修改…...

CSS+ JS 实现手电筒效果

前言概述 JavaScript 结合 CSS 打造的一款图片特效&#xff0c;当鼠标拖拽滑块时&#xff0c;让本该置灰的图片局部恢复本来的颜色。且该效果随着你的鼠标的按下时的移动而移动。 核心功能 图片置灰 拖拽功能 让滑块位置处的图片恢复本来的颜色 实现原理 这个的实现原理并不…...

2021地理设计组二等奖:基于InSAR和指数分析的地面沉降风

作品简介 一、作品背景 地面沉降是指地面高程的降低, 又称地面下沉或地沉, 是以缓慢、难以察觉的向下垂直运动为主, 是指在自然和人为因素作用下, 由于地壳表层土体压缩而导致区域性地面标高降低的一种环境现象。目前, 地面沉降己成为城市化进程中普遍存在的生态环境问题, 成为…...

计算机操作系统(第四版)第二章进程的描述与控制—课后习题答案

1.什么是前趋图&#xff1f;为什么要引入前趋图&#xff1f; 前趋图是一个有向无循环图&#xff0c;记为DAG&#xff0c;用于描述进程之间执行的先后关系。 2.试画出下面四条语句的前趋图&#xff1a; S1:axy; S2:bz1; S3:ca-b; S4:wc1; 3.为什么程序并发执行会产生间断性特征&…...

CAN通信----电路图

CAN通信----基本原理 一、CAN总线网络连接 1.闭环总线网络----ISO11898 闭环总线网络高速、短距离&#xff0c;它的总线最大长度为 40m&#xff0c;通信速度最高为 1Mbps&#xff0c;总线的两端各要求有一个120 欧的电阻。 2.开环总线网络----ISO11519 开环总线网络低速、…...

Windows系统安装ElasticSearch(一)

一 ES介绍Elasticsearch 是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene(TM) 基础上的搜索引擎.当然 Elasticsearch 并不仅仅是 Lucene 那么简单&#xff0c;它不仅包括了全文搜索功能&#xff0c;还可以进行以下工作:分布式实时文件存储&#…...

linux 产生随机数 并遍历

1、产生随机数 varRANDOMvarRANDOM varRANDOMvar[ $var % 150 ] 2、产生不重复的随机数 $ entries($(shuf -i 0-149 -n 15)) $ echo “${entries[]}” 3、对随机数排序 $ entries($(shuf -i 0-149 -n 15 | sort -n)) $ echo “entries[]"12224549546678798393118119124140…...

【3.24】Mybatis常见面试题

Mybatis常见面试题 #{}和&#xffe5;{}的区别是什么&#xff1f; 【#】&#xff1a;底层执行SQL使用PreparedStatement对象&#xff0c;预编译SQL&#xff0c;相对安全。入参使用占位符的方式。 【$】&#xff1a;底层执行SQL使用Statement对象&#xff0c;入参使用SQL拼接的…...

IDEA 热部署,修改代码不用重启项目

热部署指在修改项目代码的时候不重启服务器让修改生效。安装JRebel and XRebelFile->Settings&#xff0c;然后Plugins-> Marketplace&#xff0c;输入JRebel&#xff0c;安装如下插件——JRebel and XRebel &#xff0c;重启idea激活JRebel and XRebel第一行输入网址&am…...

将 XLS 转换为 EXE:xlCompiler Crack

只需单击几下即可将Excel文件转换为应用程序 xl编译器无需编程即可将您的Excel电子表格转换为软件应用程序 将 XLS 转换为 EXE 将Excel文件转换为具有保护选项的应用程序。Excel 到 EXE 转换器为您提供了分发 Excel 模型的竞争优势和灵活性。将 Excel 的功能丰富的环境保存在应…...

【百面成神】spring基础12问,你能坚持到第几问

前 言 &#x1f349; 作者简介&#xff1a;半旧518&#xff0c;长跑型选手&#xff0c;立志坚持写10年博客&#xff0c;专注于java后端 ☕专栏简介&#xff1a;java面试宝典&#xff0c;特点&#xff1a;全、精、深、简&#xff0c;力求每个核心知识点1分钟回答好。 &#x1f3…...

javaSE类和对象(下)

目录君1.封装2.访问限定符3.包的定义及使用4.static成员变量5.static成员方法6.代码块及其分类实例代码块静态代码块静态代码块与实例代码块的执行顺序static成员变量(类变量)初始化1.封装 面向对象程序三大特性&#xff1a;封装、继承、多态。而类和对象阶段&#xff0c;主要…...

【数据结构】第四站:单链表力扣题(二)

目录 一、链表的回文结构 二、相交链表 三、环形链表 四、环形链表Ⅱ 五、复制带随机指针的链表 一、链表的回文结构 题目描述&#xff1a;链表的回文结构_牛客题霸_牛客网 对于这道题&#xff0c;如果没有前面的一些题的基础&#xff0c;是非常难做的&#xff0c;我们的思…...

KafKa知识汇总

前言 汇总相关知识 Kafka快速实战与基本原理详解...

【RV1126】调试GT911,1024x600 7寸 MIPI 电容触摸屏

文章目录一、驱动注册失败二、触摸屏可以触摸&#xff0c;但是x轴数据反了三、可以触摸了&#xff0c;但是Y轴数据跳变&#xff0c;几乎只有一半的屏幕是可以正常滑动的三、汇顶触摸屏配置文件解析四、使用新的配置文件4.1 新配置解决问题4.2 测试触摸的方法在kernel增加frame …...

C的强符号/弱符号

首先上代码和结果&#xff1a; 代码&#xff1a; #include <stdio.h> int k; int k; int main() {printf("addr of k %p\n", &k);printf("value of k %d\n", k);return 0; }结果&#xff1a; addr of k 00408074 value of k 0问题&…...

AD/DA转换(XPT2046)

AD/DA介绍AD&#xff08;Analog to Digital&#xff09;&#xff1a;模拟-数字转换&#xff0c;将模拟信号转换为计算机可操作的数字信号DA&#xff08;Digital to Analog&#xff09;&#xff1a;数字-模拟转换&#xff0c;将计算机输出的数字信号转换为模拟信号AD/DA转换打开…...

乐观锁和悲观锁 面试题

Mysql的乐观锁和悲观锁 实现方式加锁时机常见的调用方式优势不足适用场景乐观锁开发自定义更新数据的时候sql语句中进行version的判断高并发容易出现不一致的问题高并发读&#xff0c;少写悲观锁Mysql内置查询数据的开始select * for update保证一致性低并发互联网高并发场景极…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

Ubuntu系统下交叉编译openssl

一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机&#xff1a;Ubuntu 20.04.6 LTSHost&#xff1a;ARM32位交叉编译器&#xff1a;arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

Qt Widget类解析与代码注释

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码&#xff0c;写上注释 当然可以&#xff01;这段代码是 Qt …...

HTML前端开发:JavaScript 常用事件详解

作为前端开发的核心&#xff0c;JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例&#xff1a; 1. onclick - 点击事件 当元素被单击时触发&#xff08;左键点击&#xff09; button.onclick function() {alert("按钮被点击了&#xff01;&…...

CSS | transition 和 transform的用处和区别

省流总结&#xff1a; transform用于变换/变形&#xff0c;transition是动画控制器 transform 用来对元素进行变形&#xff0c;常见的操作如下&#xff0c;它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...

[论文阅读]TrustRAG: Enhancing Robustness and Trustworthiness in RAG

TrustRAG: Enhancing Robustness and Trustworthiness in RAG [2501.00879] TrustRAG: Enhancing Robustness and Trustworthiness in Retrieval-Augmented Generation 代码&#xff1a;HuichiZhou/TrustRAG: Code for "TrustRAG: Enhancing Robustness and Trustworthin…...

第一篇:Liunx环境下搭建PaddlePaddle 3.0基础环境(Liunx Centos8.5安装Python3.10+pip3.10)

第一篇&#xff1a;Liunx环境下搭建PaddlePaddle 3.0基础环境&#xff08;Liunx Centos8.5安装Python3.10pip3.10&#xff09; 一&#xff1a;前言二&#xff1a;安装编译依赖二&#xff1a;安装Python3.10三&#xff1a;安装PIP3.10四&#xff1a;安装Paddlepaddle基础框架4.1…...

[USACO23FEB] Bakery S

题目描述 Bessie 开了一家面包店! 在她的面包店里&#xff0c;Bessie 有一个烤箱&#xff0c;可以在 t C t_C tC​ 的时间内生产一块饼干或在 t M t_M tM​ 单位时间内生产一块松糕。 ( 1 ≤ t C , t M ≤ 10 9 ) (1 \le t_C,t_M \le 10^9) (1≤tC​,tM​≤109)。由于空间…...

如何把工业通信协议转换成http websocket

1.现状 工业通信协议多数工作在边缘设备上&#xff0c;比如&#xff1a;PLC、IOT盒子等。上层业务系统需要根据不同的工业协议做对应开发&#xff0c;当设备上用的是modbus从站时&#xff0c;采集设备数据需要开发modbus主站&#xff1b;当设备上用的是西门子PN协议时&#xf…...