基于blockqueue的生产和消费模型
线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念:
同步:在保证数据安全的条件下,让线程按某种特定的顺序依次访问临界资源。
通过上一节的代码我们实现了一个多线程抢票的程序,但结果显示的是一个线程在疯狂的抢票,这就导致了其他线程抢占不到临界资源而导致的饥饿问题。
所以在抢代码的代码逻辑中,我们需要保证各个线程以同步的方式进行抢票。那如何实现呢?就需要用到信号量:
条件变量的接口函数:

条件变量的使用:
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>int tickets = 1000;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *start_routine(void *args)
{std::string name = static_cast<const char *>(args);while (true){pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex); // 为什么要有mutex,后面就说// 判断暂时省略std::cout << name << " -> " << tickets << std::endl;tickets--;pthread_mutex_unlock(&mutex);}
}int main()
{// 通过条件变量控制线程的执行pthread_t t[5];for (int i = 0; i < 5; i++){char *name = new char[64];snprintf(name, 64, "thread %d", i + 1);pthread_create(t+i, nullptr, start_routine, name);}while (true){sleep(1);// pthread_cond_signal(&cond);pthread_cond_broadcast(&cond);std::cout << "main thread wakeup one thread..." << std::endl;}for (int i = 0; i < 5; i++){pthread_join(t[i], nullptr);}return 0;
}
条件变量的理解:
我们用一个例子来说明:
没有条件变量就好比是面试官在一个办公室面试,只要应聘者得到门外挂着的钥匙进来就可以面试。这时一个应聘者抢到了钥匙进入了办公室,面试结束后想要将钥匙归还时感觉自己面的不好,这时自己离钥匙也是最近的,所以刚出门又拿起了钥匙继续面试。这钥匙就导致了其他应聘者拿不到而这个应聘者在疯狂的面试,很明显这不合理。
但是有了条件变量就好比是面试官在外面设置了一个等待区,面试完的人必须归还钥匙并且再次回到等待队列的最末尾重新排队,这就保证了各个应聘者都有机会面试。

条件不满足的时候,线程必须去某些定义好的条件变量上进行等待。
pthread_cond_wait第二个参数的意义(将锁传进去):该函数调用的时候,会以原子性的方式,将锁释放,并将线程挂起等待。用pthread_cond_sign将线程唤醒时,该线程会拿着你所传入的锁将代码继续向下运行。通过以上知识的铺垫让我们进入今天的正题:
生产者消费者模型
什么是生产者消费者模型呢?我们用一个形象的图来说明:

生产者消费者模型就是生产者可以将生产的数据存放在一个共享区,消费者从共享区中获得数据进行消费。由于是共享区,就要保证数据的安全性。当生产者生产一个数据时,另一个生产者不能在同一个数据上生产,不然会导致数据的不安全性。因此生产者和生产者之间是互斥关系的。同样消费者也不能同时消费同一个数据,不然可能会导致数据的不一致性,因此消费者和消费者之间是互斥关系的。如果这时一个生产者正在生产一个数据,而同时消费者也正在消费这个数据。就可能导致数据还没生产完全就已经被消费了,也会导致数据的不安全性。因此生产者和消费者之间是互斥关系,同时我们还希望生产者生产以后就有消费者来消费,消费者消费完一个就有生产者来生产,因此生产者和消费者之间是同步关系。
总结一下就是3种关系、两种角色、一个交易场所:

这一段特殊的缓冲区可以提前存放一批数据,这样消费者想消费的时候消费,生产者想什么时候生产就什么时候生产。解决了生产和消费两批线程忙闲不均的问题,是它们不具有强耦合的关系。
基于blockqueue的生产和消费模型

基于普通方式处理数据
代码实现:
//Main.cc#include <iostream>
#include <unistd.h>
#include <time.h>
#include "BlockQueue.hpp"using namespace std;void* consumer(void* args)
{BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);while(true){int val =0;bq->pop(&val);cout<<"我是消费者,我消费了一个数字:"<<val<<endl;sleep(1);}return nullptr;
}void* productor(void* args)
{BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);while(true){int val =rand()%10;bq->push(val);cout<<"我是生产者,我生产了一个数字:"<<val<<endl;//sleep(1);}return nullptr;
}int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueue<int>* queue =new BlockQueue<int>(5);pthread_t c,p;pthread_create(&c,nullptr,consumer,(void*)queue);pthread_create(&p,nullptr,productor,(void*)queue);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}//BlockQueue.hpp
#include <queue>
#include <pthread.h>
#include <iostream>
using namespace std;const int NUM =5;
template<class T>
class BlockQueue
{
public:BlockQueue(const int numsize =NUM):_maxsize(NUM){pthread_mutex_init(&_lock,nullptr);pthread_cond_init(&_ccond,nullptr);pthread_cond_init(&_ccond,nullptr);}void push(const T& in){pthread_mutex_lock(&_lock);while(is_Full()){//这里说明阻塞队列是满的,需要让生产者等待pthread_cond_wait(&_pcond,&_lock);}//这里说明阻塞队列至少有一个空位可以插入_queue.push(in);//唤醒消费者去消费pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_lock);}void pop(T* out){pthread_mutex_lock(&_lock);while(is_Empty()){//这里说明阻塞队列是空的,需要让消费者等待pthread_cond_wait(&_ccond,&_lock);}//这里说明阻塞队列至少有一个数据*out=_queue.front();_queue.pop();//唤醒生产者生产数据pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_lock);}~BlockQueue(){pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_ccond);pthread_cond_destroy(&_pcond);}private:bool is_Full(){return _queue.size()==_maxsize;}bool is_Empty(){return _queue.empty();}private:queue<T> _queue;int _maxsize;pthread_mutex_t _lock; //保护临界资源的锁pthread_cond_t _ccond; //消费者的条件变量pthread_cond_t _pcond; //生产者的条件变量
};
代码细节:

基于计算器任务的Task
我们得创建一个task.hpp,里面定义一个CalTask类:
class CalTask
{
public:using func_t =std::function<int(int,int,char)>;CalTask(){}CalTask(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func){}std::string operator()(){int result =_callback(_x,_y,_op);char buffer[64];snprintf(buffer,sizeof buffer,"%d %c %d =%d",_x,_op,_y,result);return buffer;}std::string to_string(){char buffer[64];snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);return buffer;}~CalTask(){}private:int _x;int _y;char _op;func_t _callback;};
生产者任务:
void* productor(void* args)
{BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);while(true){int x =rand()%10;int y =rand()%10;char op =op_str[rand()%op_str.size()];CalTask task(x,y,op,mymath);bq->push(task);cout<<"productor task:"<<task.to_string()<<endl;//sleep(1);}return nullptr;
}
消费者任务:
void* consumer(void* args)
{BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);while(true){CalTask t;bq->pop(&t);string result =t();cout<<"consumer result:"<<result<<endl;sleep(1);}return nullptr;
}
基于两个阻塞队列实现计算与存储:
将计算和存储的两个队列放进同一个类中:
template<class c,class s>
class BlockQueues
{public:BlockQueue<c>* c_bq; BlockQueue<s>* s_bq;
};
main函数:
int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueues<CalTask,SaveTask> bqs;bqs.c_bq =new BlockQueue<CalTask>;bqs.s_bq =new BlockQueue<SaveTask>;//BlockQueue<CalTask>* bqs =new BlockQueue<CalTask>(5);pthread_t c,p,s;pthread_create(&c,nullptr,consumer,(void*)&bqs);pthread_create(&p,nullptr,productor,(void*)&bqs);pthread_create(&s,nullptr,saver,(void*)&bqs);pthread_join(c,nullptr);pthread_join(p,nullptr);pthread_join(s,nullptr);delete bqs.c_bq;delete bqs.s_bq;return 0;
}
存储任务:
class SaveTask
{typedef std::function<void(const std::string&)> func_t;
public:SaveTask(){}SaveTask(const std::string &message, func_t func): _message(message), _func(func){}void operator()(){_func(_message);}
private:std::string _message;func_t _func;
};void Save(const std::string &message)
{const std::string target = "./log.txt";FILE *fp = fopen(target.c_str(), "a+");if(!fp){std::cerr << "fopen error" << std::endl;return;}fputs(message.c_str(), fp);fputs("\n", fp);fclose(fp);
}
存储线程执行的任务:
void *saver(void *bqs_)
{BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;while(true){SaveTask t;save_bq->pop(&t);t();std::cout << "save thread,保存任务完成..." << std::endl; }return nullptr;
}
代码的执行结果:

生产者与消费者模型优势总结:
这个模型的优势不在于多个线程能够并发式的对共享资源里面的数据进行访问和处理,而是多个线程能够在加载任务和处理任务的时候进行并发处理。

在我们上面的代码逻辑中很简单,就是随机构造x和y构成一个任务最后放入阻塞队列中。而在实际情况中加载任务的时候没有那么简单,有时需要从网络或者数据库中加载,这就需要消耗很长的一段时间。这个模型的优势就是在于多个生产者能同时加载多个任务,随后竞争出一名生产者将任务放入共享资源(阻塞队列)中,然后在竞争出一名消费者取出任务。因此模型的优势不在于多个线程能并发的从阻塞队列中拿数据处理,而是在加载任务的时候做到并发节省时间。同理处理任务,当一个线程处理任务的同时,另一个线程仍可以从阻塞队列取出任务处理,不影响之前的进程处理任务,因此消费者处理任务的环节也做到了并发,实现了加载任务和处理任务的解耦,提高了整个程序的运行效率和速度。
到这里本章的内容就全部结束了,创作不易,希望大家多多点赞支持。
相关文章:
基于blockqueue的生产和消费模型
线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念: 同步:在保证数据安全的条件下,让线程按某种特定的顺序依次访问临界资源。 通过上一节的代码我们实现了一个多线程抢票的程序,但结果显示…...
Editors(Vim)
文章目录 Editors(Vim)学哪一个编辑器?Vim Philosophy of VimModal editing 模态编辑Basics 基础知识Inserting text 插入文本Buffers, tabs, and windows 缓冲区、选项卡和窗口Command-line 命令行 Vim’s interface is a programming language. Vim的接口是一种编…...
【Leetcode】134.加油站
一、题目 1、题目描述 在一条环路上有 n 个加油站,其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车,从第 i 个加油站开往第 i+1 个加油站需要消耗汽油 cost[i] 升。你从其中的一个加油站出发,开始时油箱为空。 给定两个整数数组 gas 和 cost,如果你…...
设计模式-建造者(生成器)模式
文章目录 简介建造者模式的核心概念产品(Product)建造者(Builder)指挥者(Director)建造者模式与其他设计模式的关系工厂模式和建造者模式uml对比 建造者模式的实现步骤建造者模式的应用场景spring中应用 建…...
内存泄露排查思路
1、泄露情况 启动闪退运行一段时间宕机 2、排查步骤 获取堆内存快照dump使用VisualVM分析dump文件通过查看堆信息的情况,定位内存溢出问题 jmap -dump:formatb,fileheap.hprof pid -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPath输出路径 3、在VisualVM中分…...
kafka学习-概念与简单实战
目录 1、核心概念 消息和批次 Topic和Partition Replicas Offset broker和集群 生产者和消费者 2、开发实战 2.1、消息发送 介绍 代码实现 2.2、消息消费 介绍 代码实现 2.3、SpringBoot Kafka pom application.yaml KafkaConfig producer consumer 1、核心…...
爬虫进阶-反爬破解5(selenium的优势和点击操作+chrome的远程调试能力+通过Chrome隔离实现一台电脑登陆多个账号)
目录 一、selenium的优势和点击操作 二、chrome的远程调试能力 三、通过Chrome隔离实现一台电脑登陆多个账号 一、selenium的优势和点击操作 1.环境搭建 工具:Chrome浏览器chromedriverselenium win用户:chromedriver.exe放在python.exe旁边 MacO…...
音视频编码格式-AAC ADT
例子:config 1408 1408(16进制) : 0001 0100 0000 1000 audioObjectType(5bit)为 00010 , 即 2, profie (audioObjectType -1 ) AAC LC samplingFrequencyIndex (4bit) 为 1000 , 即 8 , 对应的采样频率为 16000 channelConfiguration (…...
【计算机网络】网络编程接口 Socket API 解读(3)
Socket 是网络协议栈暴露给编程人员的 API,相比复杂的计算机网络协议,API 对关键操作和配置数据进行了抽象,简化了程序编程。 本文讲述的 socket 内容源自 Linux 发行版 centos 9 上的 man 工具,和其他平台(比如 os-x …...
kafka知识小结
1.为什么分区数只能增加,不能减少? 按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。 另外实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理? 如果随着分区一起消失则消息的可靠性得不到保障; 如果需要保留则又需…...
算法刷题记录-DP(LeetCode)
746. Min Cost Climbing Stairs 代码 int minCostClimbingStairs(vector<int>& cost) {if (cost.size()<2){return 0;}int cache[cost.size()1];cache[0]0;cache[1]0;for (int i 2; i < cost.size(); i) {cache[i] min(cache[i-2]cost[i-2],cache[i-1]cost[i…...
Springboot整合Neo4J图数据库
1.引入依赖 JDK11, neo4J4.4.23 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent …...
Unity 2018发布在iOS 16.3偶尔出现画面不动的问题
1)Unity 2018发布在iOS 16.3偶尔出现画面不动的问题 2)IL2CPP在Xcode下增量编译问题 3)帧同步实现PuppetMaster布娃娃系统的问题 这是第351篇UWA技术知识分享的推送,精选了UWA社区的热门话题,涵盖了UWA问答、社区帖子等…...
蠕虫病毒流量分析案例
背景 某供排水集团的网络管理员对其网络的健康状况持认可态度,表示网络运行正常,没有发现异常行为。然而,由于网络环境变得越来越复杂,仅凭借传统的网络经验已经不能全面了解网络情况。因此,我们为供排水集团安装了Ne…...
Transformer(一)—— Attention Batch Normalization
Transformer详解 一、RNN循环神经网络二、seq2seq模型三、Attention(注意力机制)四、Transformer4.1 self attention4.2 self-attention的变形——Multi-head Self-attention4.3 Masked Attention4.4 Positional Encoding4.5 Batch Normalization4.6 Lay…...
2023高教社杯数学建模C题思路代码 - 蔬菜类商品的自动定价与补货决策
# 1 赛题 在生鲜商超中,一般蔬菜类商品的保鲜期都比较短,且品相随销售时间的增加而变差, 大部分品种如当日未售出,隔日就无法再售。因此, 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬菜…...
【C++漂流记】一文搞懂类与对象的封装
本篇文章主要说明了类与对象中封装的有关知识,包括属性和行为作为整体、访问权限、class与struct的区别、成员属性的私有化,希望这篇文章可以帮助你更好的了解类与对象这方面的知识。 文章目录 一、属性和行为作为整体二、访问权限三、class与struct的区…...
ctfshow 反序列化
PHP反序列化前置知识 序列化和反序列化 对象是不能在字节流中传输的,序列化就是把对象转化为字符串以便存储和传输,反序列化就是将字符串转化为对象 魔术方法 __construct() //构造,当对象new时调用 __wakeup() //执行unserialize()时&am…...
数据结构:线性表之-单向链表(无头)
目录 什么是单向链表 顺序表和链表的区别和联系 顺序表: 链表: 链表表示(单项)和实现 1.1 链表的概念及结构 1.2单链表(无头)的实现 所用文件 将有以下功能: 链表定义 创建新链表元素 尾插 头插 尾删 头删 查找-给一个节点的…...
为IT服务台构建自定义Zia操作
Zia是manageengine的商业人工智能助手,是ServiceDesk Plus Cloud的虚拟会话支持代理。使用Zia,您可以优化帮助台管理,还可以缩小最终用户与其帮助台之间的差距,Zia通过执行预配置的操作来帮助用户完成他们的服务台任务。 例如&…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...
大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...
Go 语言接口详解
Go 语言接口详解 核心概念 接口定义 在 Go 语言中,接口是一种抽象类型,它定义了一组方法的集合: // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的: // 矩形结构体…...
剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
今日科技热点速览
🔥 今日科技热点速览 🎮 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售,主打更强图形性能与沉浸式体验,支持多模态交互,受到全球玩家热捧 。 🤖 人工智能持续突破 DeepSeek-R1&…...
什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
算法笔记2
1.字符串拼接最好用StringBuilder,不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...
