基于blockqueue的生产和消费模型
线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念:
同步:在保证数据安全的条件下,让线程按某种特定的顺序依次访问临界资源。
通过上一节的代码我们实现了一个多线程抢票的程序,但结果显示的是一个线程在疯狂的抢票,这就导致了其他线程抢占不到临界资源而导致的饥饿问题。
所以在抢代码的代码逻辑中,我们需要保证各个线程以同步的方式进行抢票。那如何实现呢?就需要用到信号量:
条件变量的接口函数:

条件变量的使用:
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>int tickets = 1000;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *start_routine(void *args)
{std::string name = static_cast<const char *>(args);while (true){pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex); // 为什么要有mutex,后面就说// 判断暂时省略std::cout << name << " -> " << tickets << std::endl;tickets--;pthread_mutex_unlock(&mutex);}
}int main()
{// 通过条件变量控制线程的执行pthread_t t[5];for (int i = 0; i < 5; i++){char *name = new char[64];snprintf(name, 64, "thread %d", i + 1);pthread_create(t+i, nullptr, start_routine, name);}while (true){sleep(1);// pthread_cond_signal(&cond);pthread_cond_broadcast(&cond);std::cout << "main thread wakeup one thread..." << std::endl;}for (int i = 0; i < 5; i++){pthread_join(t[i], nullptr);}return 0;
}
条件变量的理解:
我们用一个例子来说明:
没有条件变量就好比是面试官在一个办公室面试,只要应聘者得到门外挂着的钥匙进来就可以面试。这时一个应聘者抢到了钥匙进入了办公室,面试结束后想要将钥匙归还时感觉自己面的不好,这时自己离钥匙也是最近的,所以刚出门又拿起了钥匙继续面试。这钥匙就导致了其他应聘者拿不到而这个应聘者在疯狂的面试,很明显这不合理。
但是有了条件变量就好比是面试官在外面设置了一个等待区,面试完的人必须归还钥匙并且再次回到等待队列的最末尾重新排队,这就保证了各个应聘者都有机会面试。

条件不满足的时候,线程必须去某些定义好的条件变量上进行等待。
pthread_cond_wait第二个参数的意义(将锁传进去):该函数调用的时候,会以原子性的方式,将锁释放,并将线程挂起等待。用pthread_cond_sign将线程唤醒时,该线程会拿着你所传入的锁将代码继续向下运行。通过以上知识的铺垫让我们进入今天的正题:
生产者消费者模型
什么是生产者消费者模型呢?我们用一个形象的图来说明:

生产者消费者模型就是生产者可以将生产的数据存放在一个共享区,消费者从共享区中获得数据进行消费。由于是共享区,就要保证数据的安全性。当生产者生产一个数据时,另一个生产者不能在同一个数据上生产,不然会导致数据的不安全性。因此生产者和生产者之间是互斥关系的。同样消费者也不能同时消费同一个数据,不然可能会导致数据的不一致性,因此消费者和消费者之间是互斥关系的。如果这时一个生产者正在生产一个数据,而同时消费者也正在消费这个数据。就可能导致数据还没生产完全就已经被消费了,也会导致数据的不安全性。因此生产者和消费者之间是互斥关系,同时我们还希望生产者生产以后就有消费者来消费,消费者消费完一个就有生产者来生产,因此生产者和消费者之间是同步关系。
总结一下就是3种关系、两种角色、一个交易场所:

这一段特殊的缓冲区可以提前存放一批数据,这样消费者想消费的时候消费,生产者想什么时候生产就什么时候生产。解决了生产和消费两批线程忙闲不均的问题,是它们不具有强耦合的关系。
基于blockqueue的生产和消费模型

基于普通方式处理数据
代码实现:
//Main.cc#include <iostream>
#include <unistd.h>
#include <time.h>
#include "BlockQueue.hpp"using namespace std;void* consumer(void* args)
{BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);while(true){int val =0;bq->pop(&val);cout<<"我是消费者,我消费了一个数字:"<<val<<endl;sleep(1);}return nullptr;
}void* productor(void* args)
{BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);while(true){int val =rand()%10;bq->push(val);cout<<"我是生产者,我生产了一个数字:"<<val<<endl;//sleep(1);}return nullptr;
}int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueue<int>* queue =new BlockQueue<int>(5);pthread_t c,p;pthread_create(&c,nullptr,consumer,(void*)queue);pthread_create(&p,nullptr,productor,(void*)queue);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}//BlockQueue.hpp
#include <queue>
#include <pthread.h>
#include <iostream>
using namespace std;const int NUM =5;
template<class T>
class BlockQueue
{
public:BlockQueue(const int numsize =NUM):_maxsize(NUM){pthread_mutex_init(&_lock,nullptr);pthread_cond_init(&_ccond,nullptr);pthread_cond_init(&_ccond,nullptr);}void push(const T& in){pthread_mutex_lock(&_lock);while(is_Full()){//这里说明阻塞队列是满的,需要让生产者等待pthread_cond_wait(&_pcond,&_lock);}//这里说明阻塞队列至少有一个空位可以插入_queue.push(in);//唤醒消费者去消费pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_lock);}void pop(T* out){pthread_mutex_lock(&_lock);while(is_Empty()){//这里说明阻塞队列是空的,需要让消费者等待pthread_cond_wait(&_ccond,&_lock);}//这里说明阻塞队列至少有一个数据*out=_queue.front();_queue.pop();//唤醒生产者生产数据pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_lock);}~BlockQueue(){pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_ccond);pthread_cond_destroy(&_pcond);}private:bool is_Full(){return _queue.size()==_maxsize;}bool is_Empty(){return _queue.empty();}private:queue<T> _queue;int _maxsize;pthread_mutex_t _lock; //保护临界资源的锁pthread_cond_t _ccond; //消费者的条件变量pthread_cond_t _pcond; //生产者的条件变量
};
代码细节:

基于计算器任务的Task
我们得创建一个task.hpp,里面定义一个CalTask类:
class CalTask
{
public:using func_t =std::function<int(int,int,char)>;CalTask(){}CalTask(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func){}std::string operator()(){int result =_callback(_x,_y,_op);char buffer[64];snprintf(buffer,sizeof buffer,"%d %c %d =%d",_x,_op,_y,result);return buffer;}std::string to_string(){char buffer[64];snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);return buffer;}~CalTask(){}private:int _x;int _y;char _op;func_t _callback;};
生产者任务:
void* productor(void* args)
{BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);while(true){int x =rand()%10;int y =rand()%10;char op =op_str[rand()%op_str.size()];CalTask task(x,y,op,mymath);bq->push(task);cout<<"productor task:"<<task.to_string()<<endl;//sleep(1);}return nullptr;
}
消费者任务:
void* consumer(void* args)
{BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);while(true){CalTask t;bq->pop(&t);string result =t();cout<<"consumer result:"<<result<<endl;sleep(1);}return nullptr;
}
基于两个阻塞队列实现计算与存储:
将计算和存储的两个队列放进同一个类中:
template<class c,class s>
class BlockQueues
{public:BlockQueue<c>* c_bq; BlockQueue<s>* s_bq;
};
main函数:
int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueues<CalTask,SaveTask> bqs;bqs.c_bq =new BlockQueue<CalTask>;bqs.s_bq =new BlockQueue<SaveTask>;//BlockQueue<CalTask>* bqs =new BlockQueue<CalTask>(5);pthread_t c,p,s;pthread_create(&c,nullptr,consumer,(void*)&bqs);pthread_create(&p,nullptr,productor,(void*)&bqs);pthread_create(&s,nullptr,saver,(void*)&bqs);pthread_join(c,nullptr);pthread_join(p,nullptr);pthread_join(s,nullptr);delete bqs.c_bq;delete bqs.s_bq;return 0;
}
存储任务:
class SaveTask
{typedef std::function<void(const std::string&)> func_t;
public:SaveTask(){}SaveTask(const std::string &message, func_t func): _message(message), _func(func){}void operator()(){_func(_message);}
private:std::string _message;func_t _func;
};void Save(const std::string &message)
{const std::string target = "./log.txt";FILE *fp = fopen(target.c_str(), "a+");if(!fp){std::cerr << "fopen error" << std::endl;return;}fputs(message.c_str(), fp);fputs("\n", fp);fclose(fp);
}
存储线程执行的任务:
void *saver(void *bqs_)
{BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;while(true){SaveTask t;save_bq->pop(&t);t();std::cout << "save thread,保存任务完成..." << std::endl; }return nullptr;
}
代码的执行结果:

生产者与消费者模型优势总结:
这个模型的优势不在于多个线程能够并发式的对共享资源里面的数据进行访问和处理,而是多个线程能够在加载任务和处理任务的时候进行并发处理。

在我们上面的代码逻辑中很简单,就是随机构造x和y构成一个任务最后放入阻塞队列中。而在实际情况中加载任务的时候没有那么简单,有时需要从网络或者数据库中加载,这就需要消耗很长的一段时间。这个模型的优势就是在于多个生产者能同时加载多个任务,随后竞争出一名生产者将任务放入共享资源(阻塞队列)中,然后在竞争出一名消费者取出任务。因此模型的优势不在于多个线程能并发的从阻塞队列中拿数据处理,而是在加载任务的时候做到并发节省时间。同理处理任务,当一个线程处理任务的同时,另一个线程仍可以从阻塞队列取出任务处理,不影响之前的进程处理任务,因此消费者处理任务的环节也做到了并发,实现了加载任务和处理任务的解耦,提高了整个程序的运行效率和速度。
到这里本章的内容就全部结束了,创作不易,希望大家多多点赞支持。
相关文章:
基于blockqueue的生产和消费模型
线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念: 同步:在保证数据安全的条件下,让线程按某种特定的顺序依次访问临界资源。 通过上一节的代码我们实现了一个多线程抢票的程序,但结果显示…...
Editors(Vim)
文章目录 Editors(Vim)学哪一个编辑器?Vim Philosophy of VimModal editing 模态编辑Basics 基础知识Inserting text 插入文本Buffers, tabs, and windows 缓冲区、选项卡和窗口Command-line 命令行 Vim’s interface is a programming language. Vim的接口是一种编…...
【Leetcode】134.加油站
一、题目 1、题目描述 在一条环路上有 n 个加油站,其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车,从第 i 个加油站开往第 i+1 个加油站需要消耗汽油 cost[i] 升。你从其中的一个加油站出发,开始时油箱为空。 给定两个整数数组 gas 和 cost,如果你…...
设计模式-建造者(生成器)模式
文章目录 简介建造者模式的核心概念产品(Product)建造者(Builder)指挥者(Director)建造者模式与其他设计模式的关系工厂模式和建造者模式uml对比 建造者模式的实现步骤建造者模式的应用场景spring中应用 建…...
内存泄露排查思路
1、泄露情况 启动闪退运行一段时间宕机 2、排查步骤 获取堆内存快照dump使用VisualVM分析dump文件通过查看堆信息的情况,定位内存溢出问题 jmap -dump:formatb,fileheap.hprof pid -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPath输出路径 3、在VisualVM中分…...
kafka学习-概念与简单实战
目录 1、核心概念 消息和批次 Topic和Partition Replicas Offset broker和集群 生产者和消费者 2、开发实战 2.1、消息发送 介绍 代码实现 2.2、消息消费 介绍 代码实现 2.3、SpringBoot Kafka pom application.yaml KafkaConfig producer consumer 1、核心…...
爬虫进阶-反爬破解5(selenium的优势和点击操作+chrome的远程调试能力+通过Chrome隔离实现一台电脑登陆多个账号)
目录 一、selenium的优势和点击操作 二、chrome的远程调试能力 三、通过Chrome隔离实现一台电脑登陆多个账号 一、selenium的优势和点击操作 1.环境搭建 工具:Chrome浏览器chromedriverselenium win用户:chromedriver.exe放在python.exe旁边 MacO…...
音视频编码格式-AAC ADT
例子:config 1408 1408(16进制) : 0001 0100 0000 1000 audioObjectType(5bit)为 00010 , 即 2, profie (audioObjectType -1 ) AAC LC samplingFrequencyIndex (4bit) 为 1000 , 即 8 , 对应的采样频率为 16000 channelConfiguration (…...
【计算机网络】网络编程接口 Socket API 解读(3)
Socket 是网络协议栈暴露给编程人员的 API,相比复杂的计算机网络协议,API 对关键操作和配置数据进行了抽象,简化了程序编程。 本文讲述的 socket 内容源自 Linux 发行版 centos 9 上的 man 工具,和其他平台(比如 os-x …...
kafka知识小结
1.为什么分区数只能增加,不能减少? 按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。 另外实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理? 如果随着分区一起消失则消息的可靠性得不到保障; 如果需要保留则又需…...
算法刷题记录-DP(LeetCode)
746. Min Cost Climbing Stairs 代码 int minCostClimbingStairs(vector<int>& cost) {if (cost.size()<2){return 0;}int cache[cost.size()1];cache[0]0;cache[1]0;for (int i 2; i < cost.size(); i) {cache[i] min(cache[i-2]cost[i-2],cache[i-1]cost[i…...
Springboot整合Neo4J图数据库
1.引入依赖 JDK11, neo4J4.4.23 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent …...
Unity 2018发布在iOS 16.3偶尔出现画面不动的问题
1)Unity 2018发布在iOS 16.3偶尔出现画面不动的问题 2)IL2CPP在Xcode下增量编译问题 3)帧同步实现PuppetMaster布娃娃系统的问题 这是第351篇UWA技术知识分享的推送,精选了UWA社区的热门话题,涵盖了UWA问答、社区帖子等…...
蠕虫病毒流量分析案例
背景 某供排水集团的网络管理员对其网络的健康状况持认可态度,表示网络运行正常,没有发现异常行为。然而,由于网络环境变得越来越复杂,仅凭借传统的网络经验已经不能全面了解网络情况。因此,我们为供排水集团安装了Ne…...
Transformer(一)—— Attention Batch Normalization
Transformer详解 一、RNN循环神经网络二、seq2seq模型三、Attention(注意力机制)四、Transformer4.1 self attention4.2 self-attention的变形——Multi-head Self-attention4.3 Masked Attention4.4 Positional Encoding4.5 Batch Normalization4.6 Lay…...
2023高教社杯数学建模C题思路代码 - 蔬菜类商品的自动定价与补货决策
# 1 赛题 在生鲜商超中,一般蔬菜类商品的保鲜期都比较短,且品相随销售时间的增加而变差, 大部分品种如当日未售出,隔日就无法再售。因此, 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬菜…...
【C++漂流记】一文搞懂类与对象的封装
本篇文章主要说明了类与对象中封装的有关知识,包括属性和行为作为整体、访问权限、class与struct的区别、成员属性的私有化,希望这篇文章可以帮助你更好的了解类与对象这方面的知识。 文章目录 一、属性和行为作为整体二、访问权限三、class与struct的区…...
ctfshow 反序列化
PHP反序列化前置知识 序列化和反序列化 对象是不能在字节流中传输的,序列化就是把对象转化为字符串以便存储和传输,反序列化就是将字符串转化为对象 魔术方法 __construct() //构造,当对象new时调用 __wakeup() //执行unserialize()时&am…...
数据结构:线性表之-单向链表(无头)
目录 什么是单向链表 顺序表和链表的区别和联系 顺序表: 链表: 链表表示(单项)和实现 1.1 链表的概念及结构 1.2单链表(无头)的实现 所用文件 将有以下功能: 链表定义 创建新链表元素 尾插 头插 尾删 头删 查找-给一个节点的…...
为IT服务台构建自定义Zia操作
Zia是manageengine的商业人工智能助手,是ServiceDesk Plus Cloud的虚拟会话支持代理。使用Zia,您可以优化帮助台管理,还可以缩小最终用户与其帮助台之间的差距,Zia通过执行预配置的操作来帮助用户完成他们的服务台任务。 例如&…...
树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
华为OD机考-机房布局
import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...
【FTP】ftp文件传输会丢包吗?批量几百个文件传输,有一些文件没有传输完整,如何解决?
FTP(File Transfer Protocol)本身是一个基于 TCP 的协议,理论上不会丢包。但 FTP 文件传输过程中仍可能出现文件不完整、丢失或损坏的情况,主要原因包括: ✅ 一、FTP传输可能“丢包”或文件不完整的原因 原因描述网络…...
从实验室到产业:IndexTTS 在六大核心场景的落地实践
一、内容创作:重构数字内容生产范式 在短视频创作领域,IndexTTS 的语音克隆技术彻底改变了配音流程。B 站 UP 主通过 5 秒参考音频即可克隆出郭老师音色,生成的 “各位吴彦祖们大家好” 语音相似度达 97%,单条视频播放量突破百万…...
leetcode_69.x的平方根
题目如下 : 看到题 ,我们最原始的想法就是暴力解决: for(long long i 0;i<INT_MAX;i){if(i*ix){return i;}else if((i*i>x)&&((i-1)*(i-1)<x)){return i-1;}}我们直接开始遍历,我们是整数的平方根,所以我们分两…...
嵌入式面试常问问题
以下内容面向嵌入式/系统方向的初学者与面试备考者,全面梳理了以下几大板块,并在每个板块末尾列出常见的面试问答思路,帮助你既能夯实基础,又能应对面试挑战。 一、TCP/IP 协议 1.1 TCP/IP 五层模型概述 链路层(Link Layer) 包括网卡驱动、以太网、Wi‑Fi、PPP 等。负责…...
理想汽车5月交付40856辆,同比增长16.7%
6月1日,理想汽车官方宣布,5月交付新车40856辆,同比增长16.7%。截至2025年5月31日,理想汽车历史累计交付量为1301531辆。 官方表示,理想L系列智能焕新版在5月正式发布,全系产品力有显著的提升,每…...
【Vue】scoped+组件通信+props校验
【scoped作用及原理】 【作用】 默认写在组件中style的样式会全局生效, 因此很容易造成多个组件之间的样式冲突问题 故而可以给组件加上scoped 属性, 令样式只作用于当前组件的标签 作用:防止不同vue组件样式污染 【原理】 给组件加上scoped 属性后…...
