当前位置: 首页 > news >正文

Linux生产者消费者模型

生产者消费者模型

  • 生产者消费者模型
    • 生产者消费者模型的概念
    • 生产者消费者模型的特点
    • 生产者消费者模型优点
  • 基于BlockingQueue的生产者消费者模型
    • 基于阻塞队列的生产者消费者模型
    • 模拟实现基于阻塞队列的生产消费模型

生产者消费者模型

生产者消费者模型的概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

在这里插入图片描述

生产者消费者模型的特点

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

  • 三种关系:生产者与生产者(竞争与互斥关系)、消费者与消费者(竞争与互斥关系)、生产者与消费者(互斥与同步关系);
  • 两种角色:生产者与消费者;
  • 一个交易场所:通常指内存中的一段缓冲区。

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

因为生产者与消费者之间的容器会被多个访问流进行访问,所以我们就需要将该临界资源使用互斥锁保护起来,防止线程安全问题的发生,因此所有的生产者和消费者都会竞争式的申请锁,生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。

生产者和消费者之间为什么会存在同步关系?

如果生产者一直生产,当空间被填满以后,生产者就会停止生产,消费者也是一样,如果消费者一直消费,空间中数据被消耗完了,消费者也会停止消费。

虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。

生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。

对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合。

基于BlockingQueue的生产者消费者模型

基于阻塞队列的生产者消费者模型

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
在这里插入图片描述
阻塞队列的特点在于:

  • 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素。
  • 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。

知识联系: 看到以上阻塞队列的描述,我们很容易想到的就是管道,而阻塞队列最典型的应用场景实际上就是管道的实现。

模拟实现基于阻塞队列的生产消费模型

我们先以单生产者,单消费者为例:

其中的BlockQueue就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的queue进行实现,下面我们进行一个简单的封装:

#pragma once#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>
#include <mutex>#define NUM 5template <class T>
class BlockQueue
{
private:// 判断队列是否满了bool isQueueFull(){return _bq.size() == _capacity;}// 判断队列是否为空bool isQueueEmpty(){return _bq.size() == 0;}public:// 构造函数BlockQueue(int capacity = NUM) : _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}// 向阻塞队列中插入数据(生产者)void push(const T &in){// 加锁pthread_mutex_lock(&_mtx);while (isQueueFull()){// 如果生产者生产过程中数据满了,就阻塞等待pthread_cond_wait(&_full, &_mtx);}_bq.push(in);// 解锁pthread_mutex_unlock(&_mtx);// 唤醒消费者pthread_cond_signal(&_empty);}// 向阻塞队列中获取数据(消费者)void pop(T &out){// 加锁pthread_mutex_lock(&_mtx);while (isQueueEmpty()){// 如果消费者消费过程中数据空了,就阻塞等待pthread_cond_wait(&_empty, &_mtx);}out = _bq.front();_bq.pop();// 解锁pthread_mutex_unlock(&_mtx);// 唤醒生产者pthread_cond_signal(&_full);}// 析构函数~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}private:// 阻塞队列std::queue<T> _bq;// 阻塞队列最大容器个数int _capacity;// 通过互斥锁来保证队列安全pthread_mutex_t _mtx;// 用来表示_bq是否为满的条件pthread_cond_t _full;// 用来表示_bq是否为空的条件pthread_cond_t _empty;
};

上述代码中需要注意以下几点:

  1. 我们实现的是单生产者与单消费者的生产者消费者模型,所以我们不需要维护生产者与生产者,消费者与消费者的关系,只需要维护生产者与消费者之间的关系;
  2. 我们将BlockQueue中的参数模板化,就不会局限于一种类型,以后就可以很好的进行复用;
  3. 我们将阻塞队列最大容器个数设置为5,表示阻塞队列中存在5个数据以后就不会在进行生生产了,此时你生产者被阻塞;
  4. 由于生产者与消费者都会访问阻塞队列,阻塞队列即为临界资源,我们需要增加互斥锁来保证线程安全的问题;
  5. 生产者向阻塞队列中插入数据时,如果阻塞队列满了,生产者就会被阻塞进行等待,直到消费者获取数据完成以后,阻塞队列中存在空余空间,唤醒生产者,进行生产;同理,消费者获取数据时,如果阻塞队列空了,消费者就会被阻塞进行等待,直到生产者生产数据完成以后,唤醒消费者,进行消费;
  6. 我们需要定义两个条件变量_full_empty描述阻塞队列的状态,进而才可以判断何时运行,何时等待;
  7. pthread_cond_wait除了会传入一个条件变量以外还会传入一个互斥锁,我们会发现,我们是在临界区中进行等待的,我们此时还处于持有锁状态,pthread_cond_wait第二个参数意义就在于成功调用wait之后,传入的锁,会被自动释放,当被唤醒的时候,就会自动获取线程锁;

判断是否满足生产消费条件时不能用if,而应该用while:

  1. pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。
  2. 其次,在多消费者的情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。
  3. 为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。

在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据:

#include "BlockQueue.hpp"void* consumer(void* args)
{BlockQueue<int>* bqueue = (BlockQueue<int>*)args;while(true){int a;bqueue->pop(a);std::cout << "consumer:" << a << std::endl;sleep(1);}return nullptr;
}void* productor(void* args)
{BlockQueue<int>* bqueue = (BlockQueue<int>*)args;int a = 1;while(true){bqueue->push(a);std::cout << "productor:" << a << std::endl;a++;sleep(1);}return nullptr;
}int main()
{pthread_t c, p;BlockQueue<int>* bq = new  BlockQueue<int>();//创建生产者消费者线程pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);}

当生产者与消费者步调一致时,我们会发现生产者生产一个数据,消费者就会消费一个数据:
在这里插入图片描述

当生产者生产的快,消费者消费的慢时,阻塞队列满了就会导致生产者阻塞等待,只有当消费者被唤醒以后消费掉一个数据,此时生产者才会被唤醒继续生产数据:
在这里插入图片描述
当生产者生产的慢,消费者消费的快,因为最开始阻塞队列中并没有数据,所以消费者就会阻塞等待,当生产者生产一个数据以后,消费者就会被唤醒消费一个数据,然后生产者继续被唤醒生产数据,消费者消费数据,步调保持一致:
在这里插入图片描述
当我们满足某一条件时再唤醒对应的生产者或消费者,比如当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费;当阻塞队列当中存储的数据小于队列容器的一半时,再唤醒生产者线程进行生产。

// 向阻塞队列中插入数据(生产者)
void push(const T &in)
{// 加锁pthread_mutex_lock(&_mtx);while (isQueueFull()){// 如果生产者生产过程中数据满了,就阻塞等待pthread_cond_wait(&_full, &_mtx);}_bq.push(in);if (_bq.size() > _capacity / 2)// 唤醒消费者pthread_cond_signal(&_empty);// 解锁pthread_mutex_unlock(&_mtx);
}// 向阻塞队列中获取数据(消费者)
void pop(T &out)
{// 加锁pthread_mutex_lock(&_mtx);while (isQueueEmpty()){// 如果消费者消费过程中数据空了,就阻塞等待pthread_cond_wait(&_empty, &_mtx);}out = _bq.front();_bq.pop();if (_bq.size() <= _capacity / 2)// 唤醒生产者pthread_cond_signal(&_full);// 解锁pthread_mutex_unlock(&_mtx);
}

在这里插入图片描述

我们仍然让生产者生产的快,消费者消费的慢。运行代码后生产者还是一瞬间将阻塞队列打满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程,而是当阻塞队列当中的数据小于队列容器的一半时,才会唤醒生产者线程进行生产。

基于计算任务的生产者消费者模型

当然,实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,我们这样做只是为了测试代码的正确性。

由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据。
例如,我们想要实现一个基于计算任务的生产者消费者模型,此时我们只需要定义一个Task类,这个类当中需要包含一个func_t成员函数:

#pragma once#include <iostream>
#include <functional>typedef std::function<int(int, int)> func_t;class Task
{
public:Task(){}Task(int x, int y, func_t func) : _x(x), _y(y), _func(func){}~Task(){}int operator()(){return _func(_x, _y);}public:int _x;int _y;func_t _func;
};

同时我们也可以将锁进行一个封装,采用RAII形式的加锁解锁风格,创建锁对象自动调用构造函数加锁,除了作用域自动调用析构函数解锁。

#pragma once#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t *mtx) : _mtx(mtx){}void lock(){std::cout << "需要进行加锁" << std::endl;pthread_mutex_lock(_mtx);}void unlock(){std::cout << "需要进行解锁" << std::endl;pthread_mutex_unlock(_mtx);}~Mutex(){}private:pthread_mutex_t *_mtx;
};class LockGuard
{
public:LockGuard(Mutex mtx) :_mtx(mtx){_mtx.lock();}~LockGuard(){_mtx.unlock();}
private:Mutex _mtx;
};

此时我们的BlockQueue.hpp中插入和获取数据代码就可以优化为:

// 向阻塞队列中插入数据(生产者)
void push(const T &in)
{LockGuard lockguard(&_mtx);while (isQueueFull()){// 如果生产者生产过程中数据满了,就阻塞等待pthread_cond_wait(&_full, &_mtx);}_bq.push(in);// 唤醒消费者pthread_cond_signal(&_empty);}// 向阻塞队列中获取数据(消费者)
void pop(T &out)
{LockGuard lockguard(&_mtx);while (isQueueEmpty()){// 如果消费者消费过程中数据空了,就阻塞等待pthread_cond_wait(&_empty, &_mtx);}out = _bq.front();_bq.pop();// 唤醒生产者pthread_cond_signal(&_full);
}

运行代码,当生产者向阻塞队列中写入一个数据后,随即消费者就会被唤醒,获取数据,也就是进行计算操作:
在这里插入图片描述
同样,我们也可以创建多个线程进行计算:
在这里插入图片描述

相关文章:

Linux生产者消费者模型

生产者消费者模型 生产者消费者模型生产者消费者模型的概念生产者消费者模型的特点生产者消费者模型优点 基于BlockingQueue的生产者消费者模型基于阻塞队列的生产者消费者模型模拟实现基于阻塞队列的生产消费模型 生产者消费者模型 生产者消费者模型的概念 生产者消费者模式就…...

【Qt-20】Qt信号与槽

一、什么是信号和槽 信号是特定情况下被发射的事件&#xff0c;发射信号使用emit关键字&#xff0c;定义信号使用signals关键字&#xff0c;在signals前面不能使用public、private、protected等限定符&#xff0c;信号只用声明&#xff0c;不需也不能对其进行定义实现。另外&am…...

“智能+”时代,深维智信如何借助阿里云打造AI内容生成系统

云布道师 前言&#xff1a; 随着数字经济的发展&#xff0c;线上数字化远程销售模式越来越成为一种主流&#xff0c;销售流程也演变为线上视频会议、线下拜访等多种方式的结合。根据 Gartner 报告&#xff0c;到 2025 年 60% 的 B2B 销售组织将从基于经验和直觉的销售转变为数…...

selenium 自动化测试——WebDriver API

控制浏览器 控制浏览器窗口大小&#xff1a;set_window_size()方法 设置全屏模式下运行&#xff1a;maximize_window()方法 from selenium import webdriver from selenium.webdriver.common.by import By import timedriver webdriver.Chrome() driver.get("http://w…...

【实战】学习 Electron:构建跨平台桌面应用

文章目录 一、Electron 简介二、Electron 的优势1. 学习曲线平缓2. 丰富的生态系统3. 跨平台支持4. 开源和社区支持 三、Electron 的使用1. 安装 Node.js2. 安装 Electron3. 创建项目4. 初始化项目5. 安装依赖6. 创建主进程文件7. 创建渲染进程文件8. 打包应用程序9. 运行应用程…...

Python开发之二维数组空缺值的近邻填充

Python开发之二维数组空缺值的填充 1 实现一&#xff0c;任意位置填充2 实现二&#xff0c;填充内部3 实现三&#xff0c;只填充边缘&#xff0c;不包括四个角 前言&#xff1a;主要实现二维数据里面某一个数据的缺失&#xff0c;用缺失的近邻数据进行均值填充&#xff0c;可以…...

vue使用pdf 导出当前页面,(jspdf, html2canvas )

需要安装两个插件 npm install html2canvas jspdfyarn add html2canvas jspdf<div class"app-container" id"pdfPage"><!--这个放你需要导出的内容--> </div><el-button size"mini" click"onExportPdf">导出…...

【oracle删除表 回滚操作】

oracle数据回滚 oracle表在被误删后&#xff0c;一定时间内&#xff0c;可以采取以下方法进行恢复: 1、先查询数据库当前时间 select to_char(sysdate,‘yyyy-mm-dd hh24:mi:ss’) from dual;2、通过当前时间往前推时间&#xff0c;选择想要恢复的时间点 select * from 表名…...

Vue3 + TypeScript

Vue3 TS开发环境创建 1. 创建环境 vite除了支持基础阶段的纯TS环境之外&#xff0c;还支持 Vue TS开发环境的快速创建, 命令如下&#xff1a; $ npm create vitelatest vue-ts-pro -- --template vue-ts 说明&#xff1a; npm create vitelatest 基于最新版本的vite进行…...

软件测试/测试开发丨南科大计算机系本科生获“火焰杯”软件测试高校就业选拔赛一等奖

2022年12月2日&#xff0c;计算机系党总支书记、副系主任王琦副教授在工学院南楼551会议室为19级徐驰同学颁发第二届“火焰杯”软件测试开发选拔赛一等奖奖项&#xff0c;为刘烨庞助理教授颁发赛事优秀指导老师奖项。徐驰同学于2022年4月获得该赛事全国总决赛第一名&#xff0c…...

访问 github 问题解决方法

一、macOS版 PS. Windows 版的还没试&#xff0c;不过应该也差不多 1.基本信息 硬件&#xff1a;MacBook Pro 2017 (A1707) 系统&#xff1a;macOS 13.6 (Ventura) 应用&#xff1a;SwitchHosts 4.1.2 (Releases oldj/SwitchHosts GitHub) hosts内容网站&#xff1a;ht…...

供应QCA8075原装芯片

长期供应各品牌原装芯片&#xff1a; SST39VF040-70-4I-NH AR9344 DC3A BGA USB2422 QFN24 W9751G6KB-251 RTL8211EG-VB-CG HI3535-RBCV100 MX25L25635FMI-10G USB2240I-AEZG EM620FV8BS-70LF HXI15H4G160AF-13K 1PQ8064/BGA-519 USB4604I-1080HN SCB15H2G160A…...

在Maven中配置代理服务器的详细教程

在Maven中配置代理服务器的详细教程如下&#xff1a; 首先&#xff0c;确保您已经安装了Maven。创建一个新的Maven项目。在命令行中输入以下命令&#xff1a; mvn archetype:generate -DgroupIdcom.example -DartifactIdmy-app -DarchetypeArtifactIdmaven-archetype-quickst…...

QStringListModel

创建模型&#xff1a; QStringListModel* model new QStringListModel(this); 初始化列表&#xff1a; QStringList strList;strList << QStringLiteral("北京") << QStringLiteral("上海") << QStringLiteral("天津") &l…...

Linux下的文件管理

一、Linux下文件命名规则 1、可以使用哪些字符&#xff1f; 理论上除了字符“/”之外&#xff0c;所有的字符都可以使用&#xff0c;但是要注意&#xff0c;在目录名或文件名中&#xff0c;不建议使用某些特殊字符&#xff0c;例如&#xff0c; <、>、&#xff1f;、* …...

RN:报错info Opening flipper://null/React?device=React%20Native

背景 在 ios 上使用 debug 模式的时候&#xff0c;报错&#xff1a;info Opening flipper://null/React?deviceReact%20Native&#xff0c;我找到了这个 issue 其实也可以看到现在打开 debug&#xff0c;是 open debug&#xff0c;也不是之前的 debug for chrome 了&#xf…...

请问嵌入式或迁移学习要学什么?

请问嵌入式或迁移学习要学什么&#xff1f; 学习嵌入式和迁移学习是一个很好的方向&#xff0c;尤其是在军I领域。以下是一些你可以提前学习的基本 知识和步骤: 嵌入式系统:最近很多小伙伴找我&#xff0c;说想要一些嵌入式资料&#xff0c;然后我根据自己从业十年经验&#…...

数据结构-----图(Graph)论必知必会知识

目录 前言 图的基本概念 1.什么是图&#xff1f; 2 .图的相关术语 3 .有向图和无向图 4.简单图和多重图 5.连通图、强连通图、非连通图 6.权与网 7.子图和(强)连通分量 8.生成树和生成森林 前言 今天我们学习一种新的数据结构-----图&#xff0c;大家在日常生活中经常都…...

外汇天眼:法国金融市场管理局(AMF)致力于向零售投资者提供有关金融产品费用的信息

法国金融市场管理局&#xff08;AMF&#xff09;已经发布了一份专为专业人士准备的指南&#xff0c;以便他们使用更易于理解和比较的术语&#xff0c;以帮助客户更好地理解和比较费用。 AMF在其网站上推出了一个新的费用信息栏目&#xff0c;提供教育内容和工具&#xff0c;帮…...

【PythonGIS】基于Python批量合并矢量数据

老样子最近有项目需要将N个矢量文件合并成一个&#xff0c;总不能用ArcGIS一个个导入吧。所以我就想着用Python编个程序实现批量合并矢量。我之前也发了一些关于Python操作矢量数据的文章&#xff1a;【Python&GIS】Python处理矢量数据的基本操作&#xff08;查询、修改、删…...

精益求精:使用Ansible集中式自动备份核心数据

1、引言 在当今数字化时代&#xff0c;数据是企业和组织的核心资产。为了确保数据的安全性和可恢复性&#xff0c;备份是至关重 要的。然而&#xff0c;手动备份数据可能会繁琐且容易出错&#xff0c;特别是在面对大规模和分布式的数据存储情况下。幸运的是&#xff0c;Ansibl…...

大数据高级面试题

大数据高级面试题 Kafka的producer如何实现幂等性? Producer 幂等性 Producer 的幂等性指的是当发送同一条消息时&#xff0c;数据在 Server 端只会被持久化一次&#xff0c;数据不丟不重&#xff0c;但是这里的幂等性是有条件的&#xff1a; 只能保证 Producer 在单个会话内…...

如何拦截响应内容并修改响应头

背景及需求描述 背景 记录分享下近期遇到并解决的困扰了比较久的问题&#xff1a;在不同系统微信生态发现同一个cos地址用window.open(url)打开在苹果和安卓设备的微信生态上表现不一致&#xff1a;对于文档类型&#xff0c;响应头Content-Type: application/pdf 在安卓微信上…...

分类预测 | Matlab实现WOA-GRU鲸鱼算法优化门控循环单元的数据多输入分类预测

分类预测 | Matlab实现WOA-GRU鲸鱼算法优化门控循环单元的数据多输入分类预测 目录 分类预测 | Matlab实现WOA-GRU鲸鱼算法优化门控循环单元的数据多输入分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现WOA-GRU鲸鱼算法优化门控循环单元的数据多输入…...

特定深度节点链表

题目链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 经典BFS与简单链表结合的题目。 #define MAX_DEPTH (1000)struct ListNode** listOfDepth(struct TreeNode* tree, int* returnSize) {*returnSize 0;struct ListNode **ans (…...

【css】背景换颜色

更换前 longin.html <!DOCTYPE html> <html lang"en" > <head><meta charset"UTF-8"><title>login</title><link href"/css/style.css" type"text/css" rel"stylesheet"><s…...

什么是美颜sdk?直播实时美颜sdk的工作流程和架构分析

在现代社交媒体和娱乐行业中&#xff0c;直播已经成为了一种受欢迎的娱乐形式&#xff0c;同时实时美颜也变得越来越重要。直播实时美颜SDK的工作流程和架构在这一领域起到了关键作用。本文将深入探讨这些SDK的内部机制&#xff0c;从而理解它们如何为用户提供出色的美颜效果。…...

第二证券:跌破3000点,热搜第一!

今天上午&#xff0c;“沪指开盘跌破3000点关口”冲上百度热搜榜榜首。 上午收盘&#xff0c;上证指数下跌0.27%&#xff0c;报2997.22点&#xff1b;深证成指下跌0.36%&#xff0c;创业板指下跌0.44%。 赛道股发力&#xff0c;光伏、风电、新能源轿车等板块盘中冲高。房地产…...

IJCAI2023【基于双曲空间探索的非独立同分布联邦学习】

1、介绍汇报的主题及汇报者 2、粗略介绍面临的挑战及出发点 3、介绍一下预备知识 4、解决方案 5、总览 6、实验设置 7、实验 8、结论...

实现Linux下Word转PDF、Java调用命令方式

使用 LibreOffice 实现 Word 转 PDF 和 Java 调用命令 1、 安装 LibreOffice 外网安装 # 一键安装 yum install -y libreoffice # 验证版本 libreoffice --version # Warning: -version is deprecated. Use --version instead. # LibreOffice 7.5.6.2 f654817fb68d6d4600d7…...