Linux生产者消费者模型
生产者消费者模型
- 生产者消费者模型
- 生产者消费者模型的概念
- 生产者消费者模型的特点
- 生产者消费者模型优点
- 基于BlockingQueue的生产者消费者模型
- 基于阻塞队列的生产者消费者模型
- 模拟实现基于阻塞队列的生产消费模型
生产者消费者模型
生产者消费者模型的概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:
- 三种关系:生产者与生产者(竞争与互斥关系)、消费者与消费者(竞争与互斥关系)、生产者与消费者(互斥与同步关系);
- 两种角色:生产者与消费者;
- 一个交易场所:通常指内存中的一段缓冲区。
生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?
因为生产者与消费者之间的容器会被多个访问流进行访问,所以我们就需要将该临界资源使用互斥锁保护起来,防止线程安全问题的发生,因此所有的生产者和消费者都会竞争式的申请锁,生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。
生产者和消费者之间为什么会存在同步关系?
如果生产者一直生产,当空间被填满以后,生产者就会停止生产,消费者也是一样,如果消费者一直消费,空间中数据被消耗完了,消费者也会停止消费。
虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。
注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。
生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。
对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合。
基于BlockingQueue的生产者消费者模型
基于阻塞队列的生产者消费者模型
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
阻塞队列的特点在于:
- 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素。
- 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。
知识联系: 看到以上阻塞队列的描述,我们很容易想到的就是管道,而阻塞队列最典型的应用场景实际上就是管道的实现。
模拟实现基于阻塞队列的生产消费模型
我们先以单生产者,单消费者为例:
其中的BlockQueue就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的queue进行实现,下面我们进行一个简单的封装:
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>
#include <mutex>#define NUM 5template <class T>
class BlockQueue
{
private:// 判断队列是否满了bool isQueueFull(){return _bq.size() == _capacity;}// 判断队列是否为空bool isQueueEmpty(){return _bq.size() == 0;}public:// 构造函数BlockQueue(int capacity = NUM) : _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}// 向阻塞队列中插入数据(生产者)void push(const T &in){// 加锁pthread_mutex_lock(&_mtx);while (isQueueFull()){// 如果生产者生产过程中数据满了,就阻塞等待pthread_cond_wait(&_full, &_mtx);}_bq.push(in);// 解锁pthread_mutex_unlock(&_mtx);// 唤醒消费者pthread_cond_signal(&_empty);}// 向阻塞队列中获取数据(消费者)void pop(T &out){// 加锁pthread_mutex_lock(&_mtx);while (isQueueEmpty()){// 如果消费者消费过程中数据空了,就阻塞等待pthread_cond_wait(&_empty, &_mtx);}out = _bq.front();_bq.pop();// 解锁pthread_mutex_unlock(&_mtx);// 唤醒生产者pthread_cond_signal(&_full);}// 析构函数~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}private:// 阻塞队列std::queue<T> _bq;// 阻塞队列最大容器个数int _capacity;// 通过互斥锁来保证队列安全pthread_mutex_t _mtx;// 用来表示_bq是否为满的条件pthread_cond_t _full;// 用来表示_bq是否为空的条件pthread_cond_t _empty;
};
上述代码中需要注意以下几点:
- 我们实现的是单生产者与单消费者的生产者消费者模型,所以我们不需要维护生产者与生产者,消费者与消费者的关系,只需要维护生产者与消费者之间的关系;
- 我们将BlockQueue中的参数模板化,就不会局限于一种类型,以后就可以很好的进行复用;
- 我们将阻塞队列最大容器个数设置为5,表示阻塞队列中存在5个数据以后就不会在进行生生产了,此时你生产者被阻塞;
- 由于生产者与消费者都会访问阻塞队列,阻塞队列即为临界资源,我们需要增加互斥锁来保证线程安全的问题;
- 生产者向阻塞队列中插入数据时,如果阻塞队列满了,生产者就会被阻塞进行等待,直到消费者获取数据完成以后,阻塞队列中存在空余空间,唤醒生产者,进行生产;同理,消费者获取数据时,如果阻塞队列空了,消费者就会被阻塞进行等待,直到生产者生产数据完成以后,唤醒消费者,进行消费;
- 我们需要定义两个条件变量
_full
和_empty
描述阻塞队列的状态,进而才可以判断何时运行,何时等待; pthread_cond_wait
除了会传入一个条件变量以外还会传入一个互斥锁,我们会发现,我们是在临界区中进行等待的,我们此时还处于持有锁状态,pthread_cond_wait
第二个参数意义就在于成功调用wait之后,传入的锁,会被自动释放,当被唤醒的时候,就会自动获取线程锁;
判断是否满足生产消费条件时不能用if,而应该用while:
pthread_cond_wait
函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。- 其次,在多消费者的情况下,当生产者生产了一个数据后如果使用
pthread_cond_broadcast
函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。 - 为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。
在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据:
#include "BlockQueue.hpp"void* consumer(void* args)
{BlockQueue<int>* bqueue = (BlockQueue<int>*)args;while(true){int a;bqueue->pop(a);std::cout << "consumer:" << a << std::endl;sleep(1);}return nullptr;
}void* productor(void* args)
{BlockQueue<int>* bqueue = (BlockQueue<int>*)args;int a = 1;while(true){bqueue->push(a);std::cout << "productor:" << a << std::endl;a++;sleep(1);}return nullptr;
}int main()
{pthread_t c, p;BlockQueue<int>* bq = new BlockQueue<int>();//创建生产者消费者线程pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);}
当生产者与消费者步调一致时,我们会发现生产者生产一个数据,消费者就会消费一个数据:
当生产者生产的快,消费者消费的慢时,阻塞队列满了就会导致生产者阻塞等待,只有当消费者被唤醒以后消费掉一个数据,此时生产者才会被唤醒继续生产数据:
当生产者生产的慢,消费者消费的快,因为最开始阻塞队列中并没有数据,所以消费者就会阻塞等待,当生产者生产一个数据以后,消费者就会被唤醒消费一个数据,然后生产者继续被唤醒生产数据,消费者消费数据,步调保持一致:
当我们满足某一条件时再唤醒对应的生产者或消费者,比如当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费;当阻塞队列当中存储的数据小于队列容器的一半时,再唤醒生产者线程进行生产。
// 向阻塞队列中插入数据(生产者)
void push(const T &in)
{// 加锁pthread_mutex_lock(&_mtx);while (isQueueFull()){// 如果生产者生产过程中数据满了,就阻塞等待pthread_cond_wait(&_full, &_mtx);}_bq.push(in);if (_bq.size() > _capacity / 2)// 唤醒消费者pthread_cond_signal(&_empty);// 解锁pthread_mutex_unlock(&_mtx);
}// 向阻塞队列中获取数据(消费者)
void pop(T &out)
{// 加锁pthread_mutex_lock(&_mtx);while (isQueueEmpty()){// 如果消费者消费过程中数据空了,就阻塞等待pthread_cond_wait(&_empty, &_mtx);}out = _bq.front();_bq.pop();if (_bq.size() <= _capacity / 2)// 唤醒生产者pthread_cond_signal(&_full);// 解锁pthread_mutex_unlock(&_mtx);
}
我们仍然让生产者生产的快,消费者消费的慢。运行代码后生产者还是一瞬间将阻塞队列打满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程,而是当阻塞队列当中的数据小于队列容器的一半时,才会唤醒生产者线程进行生产。
基于计算任务的生产者消费者模型
当然,实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,我们这样做只是为了测试代码的正确性。
由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据。
例如,我们想要实现一个基于计算任务的生产者消费者模型,此时我们只需要定义一个Task类,这个类当中需要包含一个func_t成员函数:
#pragma once#include <iostream>
#include <functional>typedef std::function<int(int, int)> func_t;class Task
{
public:Task(){}Task(int x, int y, func_t func) : _x(x), _y(y), _func(func){}~Task(){}int operator()(){return _func(_x, _y);}public:int _x;int _y;func_t _func;
};
同时我们也可以将锁进行一个封装,采用RAII形式的加锁解锁风格,创建锁对象自动调用构造函数加锁,除了作用域自动调用析构函数解锁。
#pragma once#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t *mtx) : _mtx(mtx){}void lock(){std::cout << "需要进行加锁" << std::endl;pthread_mutex_lock(_mtx);}void unlock(){std::cout << "需要进行解锁" << std::endl;pthread_mutex_unlock(_mtx);}~Mutex(){}private:pthread_mutex_t *_mtx;
};class LockGuard
{
public:LockGuard(Mutex mtx) :_mtx(mtx){_mtx.lock();}~LockGuard(){_mtx.unlock();}
private:Mutex _mtx;
};
此时我们的BlockQueue.hpp
中插入和获取数据代码就可以优化为:
// 向阻塞队列中插入数据(生产者)
void push(const T &in)
{LockGuard lockguard(&_mtx);while (isQueueFull()){// 如果生产者生产过程中数据满了,就阻塞等待pthread_cond_wait(&_full, &_mtx);}_bq.push(in);// 唤醒消费者pthread_cond_signal(&_empty);}// 向阻塞队列中获取数据(消费者)
void pop(T &out)
{LockGuard lockguard(&_mtx);while (isQueueEmpty()){// 如果消费者消费过程中数据空了,就阻塞等待pthread_cond_wait(&_empty, &_mtx);}out = _bq.front();_bq.pop();// 唤醒生产者pthread_cond_signal(&_full);
}
运行代码,当生产者向阻塞队列中写入一个数据后,随即消费者就会被唤醒,获取数据,也就是进行计算操作:
同样,我们也可以创建多个线程进行计算:
相关文章:

Linux生产者消费者模型
生产者消费者模型 生产者消费者模型生产者消费者模型的概念生产者消费者模型的特点生产者消费者模型优点 基于BlockingQueue的生产者消费者模型基于阻塞队列的生产者消费者模型模拟实现基于阻塞队列的生产消费模型 生产者消费者模型 生产者消费者模型的概念 生产者消费者模式就…...

【Qt-20】Qt信号与槽
一、什么是信号和槽 信号是特定情况下被发射的事件,发射信号使用emit关键字,定义信号使用signals关键字,在signals前面不能使用public、private、protected等限定符,信号只用声明,不需也不能对其进行定义实现。另外&am…...

“智能+”时代,深维智信如何借助阿里云打造AI内容生成系统
云布道师 前言: 随着数字经济的发展,线上数字化远程销售模式越来越成为一种主流,销售流程也演变为线上视频会议、线下拜访等多种方式的结合。根据 Gartner 报告,到 2025 年 60% 的 B2B 销售组织将从基于经验和直觉的销售转变为数…...
selenium 自动化测试——WebDriver API
控制浏览器 控制浏览器窗口大小:set_window_size()方法 设置全屏模式下运行:maximize_window()方法 from selenium import webdriver from selenium.webdriver.common.by import By import timedriver webdriver.Chrome() driver.get("http://w…...

【实战】学习 Electron:构建跨平台桌面应用
文章目录 一、Electron 简介二、Electron 的优势1. 学习曲线平缓2. 丰富的生态系统3. 跨平台支持4. 开源和社区支持 三、Electron 的使用1. 安装 Node.js2. 安装 Electron3. 创建项目4. 初始化项目5. 安装依赖6. 创建主进程文件7. 创建渲染进程文件8. 打包应用程序9. 运行应用程…...

Python开发之二维数组空缺值的近邻填充
Python开发之二维数组空缺值的填充 1 实现一,任意位置填充2 实现二,填充内部3 实现三,只填充边缘,不包括四个角 前言:主要实现二维数据里面某一个数据的缺失,用缺失的近邻数据进行均值填充,可以…...

vue使用pdf 导出当前页面,(jspdf, html2canvas )
需要安装两个插件 npm install html2canvas jspdfyarn add html2canvas jspdf<div class"app-container" id"pdfPage"><!--这个放你需要导出的内容--> </div><el-button size"mini" click"onExportPdf">导出…...
【oracle删除表 回滚操作】
oracle数据回滚 oracle表在被误删后,一定时间内,可以采取以下方法进行恢复: 1、先查询数据库当前时间 select to_char(sysdate,‘yyyy-mm-dd hh24:mi:ss’) from dual;2、通过当前时间往前推时间,选择想要恢复的时间点 select * from 表名…...

Vue3 + TypeScript
Vue3 TS开发环境创建 1. 创建环境 vite除了支持基础阶段的纯TS环境之外,还支持 Vue TS开发环境的快速创建, 命令如下: $ npm create vitelatest vue-ts-pro -- --template vue-ts 说明: npm create vitelatest 基于最新版本的vite进行…...

软件测试/测试开发丨南科大计算机系本科生获“火焰杯”软件测试高校就业选拔赛一等奖
2022年12月2日,计算机系党总支书记、副系主任王琦副教授在工学院南楼551会议室为19级徐驰同学颁发第二届“火焰杯”软件测试开发选拔赛一等奖奖项,为刘烨庞助理教授颁发赛事优秀指导老师奖项。徐驰同学于2022年4月获得该赛事全国总决赛第一名,…...
访问 github 问题解决方法
一、macOS版 PS. Windows 版的还没试,不过应该也差不多 1.基本信息 硬件:MacBook Pro 2017 (A1707) 系统:macOS 13.6 (Ventura) 应用:SwitchHosts 4.1.2 (Releases oldj/SwitchHosts GitHub) hosts内容网站:ht…...
供应QCA8075原装芯片
长期供应各品牌原装芯片: SST39VF040-70-4I-NH AR9344 DC3A BGA USB2422 QFN24 W9751G6KB-251 RTL8211EG-VB-CG HI3535-RBCV100 MX25L25635FMI-10G USB2240I-AEZG EM620FV8BS-70LF HXI15H4G160AF-13K 1PQ8064/BGA-519 USB4604I-1080HN SCB15H2G160A…...

在Maven中配置代理服务器的详细教程
在Maven中配置代理服务器的详细教程如下: 首先,确保您已经安装了Maven。创建一个新的Maven项目。在命令行中输入以下命令: mvn archetype:generate -DgroupIdcom.example -DartifactIdmy-app -DarchetypeArtifactIdmaven-archetype-quickst…...
QStringListModel
创建模型: QStringListModel* model new QStringListModel(this); 初始化列表: QStringList strList;strList << QStringLiteral("北京") << QStringLiteral("上海") << QStringLiteral("天津") &l…...
Linux下的文件管理
一、Linux下文件命名规则 1、可以使用哪些字符? 理论上除了字符“/”之外,所有的字符都可以使用,但是要注意,在目录名或文件名中,不建议使用某些特殊字符,例如, <、>、?、* …...

RN:报错info Opening flipper://null/React?device=React%20Native
背景 在 ios 上使用 debug 模式的时候,报错:info Opening flipper://null/React?deviceReact%20Native,我找到了这个 issue 其实也可以看到现在打开 debug,是 open debug,也不是之前的 debug for chrome 了…...

请问嵌入式或迁移学习要学什么?
请问嵌入式或迁移学习要学什么? 学习嵌入式和迁移学习是一个很好的方向,尤其是在军I领域。以下是一些你可以提前学习的基本 知识和步骤: 嵌入式系统:最近很多小伙伴找我,说想要一些嵌入式资料,然后我根据自己从业十年经验&#…...

数据结构-----图(Graph)论必知必会知识
目录 前言 图的基本概念 1.什么是图? 2 .图的相关术语 3 .有向图和无向图 4.简单图和多重图 5.连通图、强连通图、非连通图 6.权与网 7.子图和(强)连通分量 8.生成树和生成森林 前言 今天我们学习一种新的数据结构-----图,大家在日常生活中经常都…...

外汇天眼:法国金融市场管理局(AMF)致力于向零售投资者提供有关金融产品费用的信息
法国金融市场管理局(AMF)已经发布了一份专为专业人士准备的指南,以便他们使用更易于理解和比较的术语,以帮助客户更好地理解和比较费用。 AMF在其网站上推出了一个新的费用信息栏目,提供教育内容和工具,帮…...
【PythonGIS】基于Python批量合并矢量数据
老样子最近有项目需要将N个矢量文件合并成一个,总不能用ArcGIS一个个导入吧。所以我就想着用Python编个程序实现批量合并矢量。我之前也发了一些关于Python操作矢量数据的文章:【Python&GIS】Python处理矢量数据的基本操作(查询、修改、删…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别
一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)
在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...

MFC 抛体运动模拟:常见问题解决与界面美化
在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...