当前位置: 首页 > news >正文

Linux——生产者消费者模型

目录

一.为何要使用生产者消费者模型

 二.生产者消费者模型优点

 三.基于BlockingQueue的生产者消费者模型

1.BlockingQueue——阻塞队列

2.实现代码

 四.POSIX信号量

五.基于环形队列的生产消费模型


一.为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

 

 二.生产者消费者模型优点

  • 解耦:生产者和消费者不直接解除,无需关心对方的情况,仅仅自己与缓冲区解除。
  • 支持并发:并发的体现并不在于多个消费者同时从缓冲区中拿数据,也不是多个生产者同时从缓冲区放数据,而是消费者在处理拿到的数据的时候,生产者可以继续向缓冲区放数据。
  • 支持忙闲不均 :当生产者生产过快的时候,可以让生产者慢下来,当消费者消费过快的时候,可以让消费者慢下来。

 三.基于BlockingQueue的生产者消费者模型

 1.BlockingQueue——阻塞队列

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

2.实现代码

#include <iostream>
#include <string>
#include <queue>
#include <ctime>
#include <unistd.h>
#include <pthread.h>using namespace std;template <class T>
class BlockQueue
{
public:BlockQueue(size_t cap): _cap(cap){// 初始化条件变量pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);}void push(T date){// 将任务push进去队列,多线程加锁,每一只能一个线程push任务pthread_mutex_lock(&_mutex);while (_q.size() == _cap) // 如果队列已经满了,生产者要被阻塞{pthread_cond_wait(&_p_cond, &_mutex);}_q.push(date);// 当push任务成功的时候,需要将唤醒消费者来处理数据pthread_cond_signal(&_c_cond);pthread_mutex_unlock(&_mutex);}T pop(){// 将任务从队列中拿出来,多线程加锁,每一只能一个线程拿任务pthread_mutex_lock(&_mutex);// 如果队列是空的就将消费者阻塞while (isempty()){pthread_cond_wait(&_c_cond, &_mutex);}T tmp = _q.front();_q.pop();// 成功拿到数据以后,唤醒生产者继续生产任务pthread_cond_signal(&_p_cond);pthread_mutex_unlock(&_mutex);return tmp;}~BlockQueue(){pthread_cond_destroy(&_c_cond);pthread_cond_destroy(&_p_cond);}private:bool isempty(){return _q.empty();}bool isfull(){return _q.size() == _cap;}private:queue<T> _q; // 队列size_t _cap; // 容量pthread_cond_t _c_cond;                             // 消费者条件变量pthread_cond_t _p_cond;                             // 生产者条件变量pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
};

cp模型:

#include "BlockQueue.hpp"using namespace std;// 构建任务
struct Task
{Task(int a, int b, char op): _a(a), _b(b), _op(op){}char _op;      // 运算符int _a;        // 运算数1int _b;        // 运算数2int ret;       // 结果int _exitcode; // 退出码
};void *push_task(void *args)
{BlockQueue<Task> *pBQ = static_cast<BlockQueue<Task> *>(args);while (1){char op_arr[4] = {'+', '-', '*', '/'};int a = rand() % 10;int b = rand() % 10;char op = op_arr[(a * b) % 4];// 构建任务结构体Task tk(a, b, op);// push任务pBQ->push(tk);printf("%d %c %d =?\n", a, op, b);sleep(1);}return NULL;
}void *get_task(void *args)
{BlockQueue<Task> *pBQ = static_cast<BlockQueue<Task> *>(args);while (1){// 获取任务并处理Task tk = pBQ->pop();switch (tk._op){case '+':tk.ret = tk._a + tk._b;break;case '-':tk.ret = tk._a - tk._b;break;case '*':tk.ret = tk._a * tk._b;break;case '/':if (tk._b == 0){exit(-1);}tk.ret = tk._a / tk._b;break;default:break;}printf("%d %c %d = %d\n", tk._a, tk._op, tk._b, tk.ret);sleep(1);}return NULL;
}int main()
{BlockQueue<Task> BQ(5);pthread_t tid_c[4];pthread_t tid_p[4];srand(time(nullptr));// pushpthread_create(&tid_c[0], NULL, push_task, &BQ);pthread_create(&tid_c[1], NULL, push_task, &BQ);pthread_create(&tid_c[2], NULL, push_task, &BQ);pthread_create(&tid_c[3], NULL, push_task, &BQ);// getpthread_create(&tid_p[0], NULL, get_task, &BQ);pthread_create(&tid_p[1], NULL, get_task, &BQ);pthread_create(&tid_p[2], NULL, get_task, &BQ);pthread_create(&tid_p[3], NULL, get_task, &BQ);pthread_join(tid_c[0], NULL);pthread_join(tid_c[1], NULL);pthread_join(tid_c[2], NULL);pthread_join(tid_c[3], NULL);pthread_join(tid_p[0], NULL);pthread_join(tid_p[1], NULL);pthread_join(tid_p[2], NULL);pthread_join(tid_p[3], NULL);return 0;
}

测试结果:

 四.POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

定义信号量:

sem_t sem;

初始化信号量:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  1. pshared:0表示线程间共享,非零表示进程间共享。
  2. value:信号量初始值。

销毁信号量:

int sem_destroy(sem_t *sem);

等待信号量:

功能:等待信号量,会将信号量的值减1。

int sem_wait(sem_t *sem); //P()

发布信号量:

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。

int sem_post(sem_t *sem);//V()

说明:

  • 信号量本身就是一个计数器,用来描述可用资源的数目。
  • 信号量机制就像是我们看电影买票一样,是对资源的预定机制。
  • 只有申请到信号量才能对共享资源访问。
  • 只要我们申请信号量成功了,将来我们一定可以访问临界资源,就像看电影,我们只要买到了电影票,不管我们去不去电影院,电影院里一定有我们的位置。

五.基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性。

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。

代码:

RingQueue.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <ctime>
#include <cstdlib>
#include <pthread.h>
#include <vector>
#include <unistd.h>
#include <semaphore.h>
#include "mutex.hpp"
#include "Task.hpp"
using namespace std;const size_t size = 5;template <class T>
class RingQueue
{void P(sem_t &sem) // 申请信号量{sem_wait(&sem);}void V(sem_t &sem) // 释放信号量{sem_post(&sem);}public:RingQueue(int cap = size): _cap(cap), _index_space(0), _index_date(0){// 初始化信号量sem_init(&_sem_date, 0, 0);    // 数据信号量初始化为0sem_init(&_sem_space, 0, cap); // 空间信号量初始化为容量大小// 初始化锁pthread_mutex_init(&_mutex, nullptr);_rq.resize(_cap);}void push(const T date){// 申请空间信号量P(_sem_space);{MutexGuard lock(&_mutex);_rq[_index_date++] = date;_index_date %= _cap;}// 释放数据信号量V(_sem_date);}T pop(){// 申请数据信号量P(_sem_date);T tmp;{MutexGuard lock(&_mutex);tmp = _rq[_index_space++];_index_space %= _cap;}// 释放空间信号量V(_sem_space);return tmp;}~RingQueue(){// 释放信号量和互斥锁sem_destroy(&_sem_date);sem_destroy(&_sem_space);pthread_mutex_destroy(&_mutex);}private:vector<T> _rq;size_t _cap; // 容量sem_t _sem_space; // 记录环形队列的空间信号量sem_t _sem_date;  // 记录环形队列的数据信号量size_t _index_space; // 生产者的生产位置size_t _index_date;  // 消费者的消费位置pthread_mutex_t _mutex; // 容量
};

mutex.hpp:

class Mutex
{
public:Mutex(pthread_mutex_t *mutex): _mutex(mutex){}void lock(){pthread_mutex_lock(_mutex);}void unlock(){pthread_mutex_unlock(_mutex);}~Mutex(){}private:pthread_mutex_t *_mutex;
};class MutexGuard
{
public:MutexGuard(pthread_mutex_t *mutex): _mutex(mutex){_mutex.lock();}~MutexGuard(){_mutex.unlock();}private:Mutex _mutex;
};

Task.hpp:

#include <iostream>
#include <cstdio>
#include <ctime>
#include <cstdlib>struct Task
{Task(int a = 1, int b = 1, char op = '+'): _a(a), _b(b), _op(op){}void run(){switch (_op){case '+':_ret = _a + _b;break;case '-':_ret = _a - _b;break;case '*':_ret = _a * _b;break;case '/':if (_b == 0){_exitcode = -1;exit(1);}_ret = _a / _b;break;default:break;}}void showtask(){printf("producer:%d %c %d = ?\n", _a, _op, _b);}void showresult(){printf("consumer:%d %c %d = %d(exitcode:%d)\n", _a, _op, _b, _ret, _exitcode);}~Task() {}private:int _a;int _b;char _op;int _ret;int _exitcode = 0;
};

pthread.cc:

#include "RingQueue.hpp"void *run_p(void *args)
{char ops[4] = {'+', '-', '*', '/'};RingQueue<Task> *RQ = static_cast<RingQueue<Task> *>(args);while (1){int a = rand() % 100;int b = rand() % 100;int op = ops[(a * b) % 4];Task tk(a, b, op);RQ->push(tk);tk.showtask();sleep(1);}
}
void *run_c(void *args)
{RingQueue<Task> *RQ = static_cast<RingQueue<Task> *>(args);while (1){Task tk = RQ->pop();tk.run();tk.showresult();sleep(1);}
}int main()
{RingQueue<Task> *RQ = new RingQueue<Task>(5);srand(time(0));pthread_t tid_c[5];pthread_t tid_p[5];for (int i = 0; i < 5; i++){pthread_create(&tid_c[i], nullptr, run_c, RQ);pthread_create(&tid_p[i], nullptr, run_p, RQ);}for (int i = 0; i < 5; i++){pthread_join(tid_c[i], nullptr);pthread_join(tid_p[i], nullptr);}delete RQ;return 0;
}

相关文章:

Linux——生产者消费者模型

目录 一.为何要使用生产者消费者模型 二.生产者消费者模型优点 三.基于BlockingQueue的生产者消费者模型 1.BlockingQueue——阻塞队列 2.实现代码 四.POSIX信号量 五.基于环形队列的生产消费模型 一.为何要使用生产者消费者模型 生产者消费者模式就是通过一个容器来解决生…...

Oracle缓存表

Oracle缓存表&#xff08;db_buffer_pool&#xff09;由三部分组成&#xff1a; buffer_pool_defualt buffer_pool_keep buffer_pool_recycle 如果要把表钉死在内存中&#xff0c;也就是把表钉在keep区。相关的命令为&#xff1a; alter table 表名 storage(buffer_pool k…...

智能变电站自动化系统的应用与产品选型

摘要&#xff1a;现如今&#xff0c;智能变电站发展已经成为了电力系统发展过程中的内容&#xff0c;如何提高智能变电站的运行效率也成为电力系统发展的一个重要目标&#xff0c;为了能够更好地促进电力系统安全稳定运行&#xff0c;本文则就智能变电站自动化系统的实现进行了…...

reactnative 底部tab页面@react-navigation/bottom-tabs

使用react-navigation/native做的页面导航和tab‘ 官网&#xff1a;https://reactnavigation.org/docs/getting-started 效果图 安装 npm install react-navigation/nativenpm install react-navigation/bottom-tabs封装tabbar.js import { View, StyleSheet, Image } from …...

运维中心—监控大盘

一、监控大盘内容分类 1、告警 2、业务趋势 3、异常码 4、主机 5、服务状态 6、系统账单 二、API分类 【基础数据】 1、分组查询各自子系统 2、子系统查询名下各个微服务 【主机】 根据分组查询主机信息&#xff0c;按照子系统分组&#xff0c;按照CPU和内存排序 步骤&#xf…...

Node.js的安装

直接在浏览器中搜索Node.js即可 打开下载好的文件 验证是否安装成功 在cmd中输入 node -v&#xff0c;若结果为版本号那就是成功的 环境配置 配置全局模块所在的路径缓存cache的路径 在安装目录中新建两个文件夹&#xff0c;文件夹名为:node_cache和node_global 输…...

vsCode git 修改、清空、重置、保存账号名密码

1、保存账号名密码&#xff0c;之后拉取代码都不用重新输入&#xff1a; git config --global credential.helper store 2、查看git用户名&#xff1a; git config user.name 3、清空所有的用户名和密码&#xff1a; git config --system --unset credential.helper 4、清…...

Docker 安装oracle12c容器并创建新用户

Docker 安装oracle12c容器并创建新用户 下载镜像 docker pull truevoly/oracle-12c启动镜像 8080和22端口没有映射出来&#xff0c;有需要自己 docker run -d -p 8123:1521 -restartalways --privilegedtrue -v /data/docker/Oracle12c_sichuan:/u01/app/oracle/ --name oracle…...

LabVIEW中管理大型数据

LabVIEW中管理大数据 LabVIEW的最大优势之一是自动内存管理。这种内存管理允许用户轻松创建字符串、数组和集群&#xff0c;而无需C/C用户经常担心。但是&#xff0c;这种内存管理设计为绝对安全&#xff0c;因此数据被非常频繁地复制。这通常不会造成任何问题&#xff0c;但是…...

dirsearch网站目录暴力破解

介绍&#xff1a; dirsearch是一个基于python3的命令行工具&#xff0c;常用于暴力扫描页面结构&#xff0c;包括网页中的目录和文件。相比其他扫描工具disearch的特点是&#xff1a; 支持HTTP代理多线程支持多种形式的网页&#xff08;asp,php&#xff09;生成报告&#xff0…...

【数据结构】线性表(三)循环链表的各种操作(创建、插入、查找、删除、修改、遍历打印、释放内存空间)

目录 线性表的定义及其基本操作&#xff08;顺序表插入、删除、查找、修改&#xff09; 四、线性表的链接存储结构 1. 单链表 2. 循环链表 a. 循环链表节点结构体 b. 创建新节点 c. 在循环链表末尾插入节点 d. 删除循环链表中指定值的节点 e. 在循环链表中查找指定值的…...

项目通用pom.xml文件模版

pom.xml模版文件 <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/…...

短视频矩阵系统源码---开发

一、智能剪辑、矩阵分发、无人直播、爆款文案于一体独立应用开发 抖去推----主要针对本地生活的----移动端(小程序软件系统&#xff0c;目前是全国源头独立开发)&#xff0c;开发功能大拆解分享&#xff0c;功能大拆解&#xff1a; 7大模型剪辑法&#xff08;数学阶乘&#x…...

vue3点击表格某个单元格文本就切换成输入框,其他单元格不变化

<el-table :data"data.tableData" height"60vh" border scrollbar-aways-on><el-table-column label"序号" type"index" width"80" fixed /><el-table-column label"操作" width"120" f…...

持续集成部署-k8s-资源调度:HPA - Pod 基于负载指标自动水平扩容缩容

首先我们找一个 Deployment 配置文件: nginx-deploy.yaml apiVersion: apps/v1 # deployment api 版本 kind: Deployment # 资源类型为 deployment metadata: # 元信息labels: # 标签app: nginx-deploy # 具体的 key: value 配置形式name: nginx-deploy...

RemObjects Elements 12.0 Crack

Elements 是一个现代多功能软件开发工具链。 它支持六种流行的编程语言&#xff1a;Oxygene (Object Pascal)、C#、Java、Mercury (Visual Basic.NET™)、Go 和 Swift&#xff0c;适用于所有现代平台。 使用 Elements&#xff0c;您可以为您喜欢的任何平台进行编程- 无论是单…...

STM32标准外设库下载(下载地址与步骤详解)

文章目录 1. 概述2. 官方下载地址3. 步骤详解3.1 打开官网3.2 工具与软件 ➡ 嵌入式软件 ➡ MEMS软件3.3 微控制器软件 ➡ STM32微控制器软件 ➡ STM32标准外设软件库 ➡ 选择产品系列3.4 选择版本 ➡ 点击下载3.5 点击“接受” ➡ 填写邮箱信息 ➡ 点击“下载”3.6 点击接收到…...

【912.排序数组】

目录 一、题目描述二、算法原理2.1快速排序2.2归并排序 三、代码实现3.1快排代码实现3.2归并代码实现 一、题目描述 二、算法原理 2.1快速排序 2.2归并排序 三、代码实现 3.1快排代码实现 class Solution { public:int getRandom(int left,int right,vector<int>&…...

【动态规划】583. 两个字符串的删除操作、72. 编辑距离

提示&#xff1a;努力生活&#xff0c;开心、快乐的一天 文章目录 583. 两个字符串的删除操作&#x1f4a1;解题思路&#x1f914;遇到的问题&#x1f4bb;代码实现&#x1f3af;题目总结 72. 编辑距离&#x1f4a1;解题思路&#x1f914;遇到的问题&#x1f4bb;代码实现&…...

Gradient conjugate priors and multi-layer neural networks

动机 先验参数 m , α , β , v m,\alpha,\beta,v m,α,β,v和随机变量 τ \tau τ KL散度的形式是&#xff1a; Dynamics of m , α , β , v m,\alpha,\beta,v m,α,β,v Dynamics of m , β , v m,\beta,v m,β,v for a fixed α \alpha α 绿色轨迹连接初始点和目标点…...

RestClient

什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端&#xff0c;它允许HTTP与Elasticsearch 集群通信&#xff0c;而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级&#xff…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试

作者&#xff1a;Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位&#xff1a;中南大学地球科学与信息物理学院论文标题&#xff1a;BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接&#xff1a;https://arxiv.…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

《通信之道——从微积分到 5G》读书总结

第1章 绪 论 1.1 这是一本什么样的书 通信技术&#xff0c;说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号&#xff08;调制&#xff09; 把信息从信号中抽取出来&am…...

使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装

以下是基于 vant-ui&#xff08;适配 Vue2 版本 &#xff09;实现截图中照片上传预览、删除功能&#xff0c;并封装成可复用组件的完整代码&#xff0c;包含样式和逻辑实现&#xff0c;可直接在 Vue2 项目中使用&#xff1a; 1. 封装的图片上传组件 ImageUploader.vue <te…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)

要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况&#xff0c;可以通过以下几种方式模拟或触发&#xff1a; 1. 增加CPU负载 运行大量计算密集型任务&#xff0c;例如&#xff1a; 使用多线程循环执行复杂计算&#xff08;如数学运算、加密解密等&#xff09;。运行图…...