当前位置: 首页 > news >正文

基于blockqueue的生产和消费模型

线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念:

同步:在保证数据安全的条件下,让线程按某种特定的顺序依次访问临界资源。

通过上一节的代码我们实现了一个多线程抢票的程序,但结果显示的是一个线程在疯狂的抢票,这就导致了其他线程抢占不到临界资源而导致的饥饿问题。

所以在抢代码的代码逻辑中,我们需要保证各个线程以同步的方式进行抢票。那如何实现呢?就需要用到信号量:

条件变量的接口函数:

条件变量的使用:

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>int tickets = 1000;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *start_routine(void *args)
{std::string name = static_cast<const char *>(args);while (true){pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex); // 为什么要有mutex,后面就说// 判断暂时省略std::cout << name << " -> " << tickets << std::endl;tickets--;pthread_mutex_unlock(&mutex);}
}int main()
{// 通过条件变量控制线程的执行pthread_t t[5];for (int i = 0; i < 5; i++){char *name = new char[64];snprintf(name, 64, "thread %d", i + 1);pthread_create(t+i, nullptr, start_routine, name);}while (true){sleep(1);// pthread_cond_signal(&cond);pthread_cond_broadcast(&cond);std::cout << "main thread wakeup one thread..." << std::endl;}for (int i = 0; i < 5; i++){pthread_join(t[i], nullptr);}return 0;
}

条件变量的理解:

我们用一个例子来说明:

没有条件变量就好比是面试官在一个办公室面试,只要应聘者得到门外挂着的钥匙进来就可以面试。这时一个应聘者抢到了钥匙进入了办公室,面试结束后想要将钥匙归还时感觉自己面的不好,这时自己离钥匙也是最近的,所以刚出门又拿起了钥匙继续面试。这钥匙就导致了其他应聘者拿不到而这个应聘者在疯狂的面试,很明显这不合理。

但是有了条件变量就好比是面试官在外面设置了一个等待区,面试完的人必须归还钥匙并且再次回到等待队列的最末尾重新排队,这就保证了各个应聘者都有机会面试。

 条件不满足的时候,线程必须去某些定义好的条件变量上进行等待

pthread_cond_wait第二个参数的意义(将锁传进去):该函数调用的时候,会以原子性的方式,将锁释放,并将线程挂起等待。用pthread_cond_sign将线程唤醒时,该线程会拿着你所传入的锁将代码继续向下运行。通过以上知识的铺垫让我们进入今天的正题:

生产者消费者模型

什么是生产者消费者模型呢?我们用一个形象的图来说明:

生产者消费者模型就是生产者可以将生产的数据存放在一个共享区,消费者从共享区中获得数据进行消费。由于是共享区,就要保证数据的安全性。当生产者生产一个数据时,另一个生产者不能在同一个数据上生产,不然会导致数据的不安全性。因此生产者和生产者之间是互斥关系的。同样消费者也不能同时消费同一个数据,不然可能会导致数据的不一致性,因此消费者和消费者之间是互斥关系的。如果这时一个生产者正在生产一个数据,而同时消费者也正在消费这个数据。就可能导致数据还没生产完全就已经被消费了,也会导致数据的不安全性。因此生产者和消费者之间是互斥关系,同时我们还希望生产者生产以后就有消费者来消费,消费者消费完一个就有生产者来生产,因此生产者和消费者之间是同步关系。

总结一下就是3种关系、两种角色、一个交易场所:

这一段特殊的缓冲区可以提前存放一批数据,这样消费者想消费的时候消费,生产者想什么时候生产就什么时候生产。解决了生产和消费两批线程忙闲不均的问题,是它们不具有强耦合的关系。

基于blockqueue的生产和消费模型

基于普通方式处理数据

代码实现:

//Main.cc#include <iostream>
#include <unistd.h>
#include <time.h>
#include "BlockQueue.hpp"using namespace std;void* consumer(void* args)
{BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);while(true){int val =0;bq->pop(&val);cout<<"我是消费者,我消费了一个数字:"<<val<<endl;sleep(1);}return nullptr;
}void* productor(void* args)
{BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);while(true){int val =rand()%10;bq->push(val);cout<<"我是生产者,我生产了一个数字:"<<val<<endl;//sleep(1);}return nullptr;
}int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueue<int>* queue =new BlockQueue<int>(5);pthread_t c,p;pthread_create(&c,nullptr,consumer,(void*)queue);pthread_create(&p,nullptr,productor,(void*)queue);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}//BlockQueue.hpp
#include <queue>
#include <pthread.h>
#include <iostream>
using namespace std;const int NUM =5;
template<class T>
class BlockQueue
{
public:BlockQueue(const int numsize =NUM):_maxsize(NUM){pthread_mutex_init(&_lock,nullptr);pthread_cond_init(&_ccond,nullptr);pthread_cond_init(&_ccond,nullptr);}void push(const T& in){pthread_mutex_lock(&_lock);while(is_Full()){//这里说明阻塞队列是满的,需要让生产者等待pthread_cond_wait(&_pcond,&_lock);}//这里说明阻塞队列至少有一个空位可以插入_queue.push(in);//唤醒消费者去消费pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_lock);}void pop(T* out){pthread_mutex_lock(&_lock);while(is_Empty()){//这里说明阻塞队列是空的,需要让消费者等待pthread_cond_wait(&_ccond,&_lock);}//这里说明阻塞队列至少有一个数据*out=_queue.front();_queue.pop();//唤醒生产者生产数据pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_lock);}~BlockQueue(){pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_ccond);pthread_cond_destroy(&_pcond);}private:bool is_Full(){return _queue.size()==_maxsize;}bool is_Empty(){return _queue.empty();}private:queue<T> _queue;int _maxsize;pthread_mutex_t _lock;   //保护临界资源的锁pthread_cond_t _ccond;   //消费者的条件变量pthread_cond_t _pcond;   //生产者的条件变量
};

代码细节:

基于计算器任务的Task

我们得创建一个task.hpp,里面定义一个CalTask类:

class CalTask
{
public:using func_t =std::function<int(int,int,char)>;CalTask(){}CalTask(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func){}std::string operator()(){int result =_callback(_x,_y,_op);char buffer[64];snprintf(buffer,sizeof buffer,"%d %c %d =%d",_x,_op,_y,result);return buffer;}std::string to_string(){char buffer[64];snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);return buffer;}~CalTask(){}private:int _x;int _y;char _op;func_t _callback;};

生产者任务:

void* productor(void* args)
{BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);while(true){int x =rand()%10;int y =rand()%10;char op =op_str[rand()%op_str.size()];CalTask task(x,y,op,mymath);bq->push(task);cout<<"productor task:"<<task.to_string()<<endl;//sleep(1);}return nullptr;
}

消费者任务:

void* consumer(void* args)
{BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);while(true){CalTask t;bq->pop(&t);string result =t();cout<<"consumer result:"<<result<<endl;sleep(1);}return nullptr;
}


基于两个阻塞队列实现计算与存储:

将计算和存储的两个队列放进同一个类中:

template<class c,class s>
class BlockQueues
{public:BlockQueue<c>* c_bq; BlockQueue<s>* s_bq;
};

main函数:

int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueues<CalTask,SaveTask> bqs;bqs.c_bq =new BlockQueue<CalTask>;bqs.s_bq =new BlockQueue<SaveTask>;//BlockQueue<CalTask>* bqs =new BlockQueue<CalTask>(5);pthread_t c,p,s;pthread_create(&c,nullptr,consumer,(void*)&bqs);pthread_create(&p,nullptr,productor,(void*)&bqs);pthread_create(&s,nullptr,saver,(void*)&bqs);pthread_join(c,nullptr);pthread_join(p,nullptr);pthread_join(s,nullptr);delete bqs.c_bq;delete bqs.s_bq;return 0;
}

存储任务:

class SaveTask
{typedef std::function<void(const std::string&)> func_t;
public:SaveTask(){}SaveTask(const std::string &message, func_t func): _message(message), _func(func){}void operator()(){_func(_message);}
private:std::string _message;func_t _func;
};void Save(const std::string &message)
{const std::string target = "./log.txt";FILE *fp = fopen(target.c_str(), "a+");if(!fp){std::cerr << "fopen error" << std::endl;return;}fputs(message.c_str(), fp);fputs("\n", fp);fclose(fp);
}

存储线程执行的任务:

void *saver(void *bqs_)
{BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;while(true){SaveTask t;save_bq->pop(&t);t();std::cout << "save thread,保存任务完成..." << std::endl; }return nullptr;
}

代码的执行结果:

生产者与消费者模型优势总结:

这个模型的优势不在于多个线程能够并发式的对共享资源里面的数据进行访问和处理,而是多个线程能够在加载任务处理任务的时候进行并发处理。

在我们上面的代码逻辑中很简单,就是随机构造x和y构成一个任务最后放入阻塞队列中。而在实际情况中加载任务的时候没有那么简单,有时需要从网络或者数据库中加载,这就需要消耗很长的一段时间。这个模型的优势就是在于多个生产者能同时加载多个任务,随后竞争出一名生产者将任务放入共享资源(阻塞队列)中,然后在竞争出一名消费者取出任务。因此模型的优势不在于多个线程能并发的从阻塞队列中拿数据处理,而是在加载任务的时候做到并发节省时间。同理处理任务,当一个线程处理任务的同时,另一个线程仍可以从阻塞队列取出任务处理,不影响之前的进程处理任务,因此消费者处理任务的环节也做到了并发,实现了加载任务和处理任务的解耦,提高了整个程序的运行效率和速度。

到这里本章的内容就全部结束了,创作不易,希望大家多多点赞支持。

相关文章:

基于blockqueue的生产和消费模型

线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念&#xff1a; 同步&#xff1a;在保证数据安全的条件下&#xff0c;让线程按某种特定的顺序依次访问临界资源。 通过上一节的代码我们实现了一个多线程抢票的程序&#xff0c;但结果显示…...

Editors(Vim)

文章目录 Editors(Vim)学哪一个编辑器&#xff1f;Vim Philosophy of VimModal editing 模态编辑Basics 基础知识Inserting text 插入文本Buffers, tabs, and windows 缓冲区、选项卡和窗口Command-line 命令行 Vim’s interface is a programming language. Vim的接口是一种编…...

【Leetcode】134.加油站

一、题目 1、题目描述 在一条环路上有 n 个加油站,其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车,从第 i 个加油站开往第 i+1 个加油站需要消耗汽油 cost[i] 升。你从其中的一个加油站出发,开始时油箱为空。 给定两个整数数组 gas 和 cost,如果你…...

设计模式-建造者(生成器)模式

文章目录 简介建造者模式的核心概念产品&#xff08;Product&#xff09;建造者&#xff08;Builder&#xff09;指挥者&#xff08;Director&#xff09;建造者模式与其他设计模式的关系工厂模式和建造者模式uml对比 建造者模式的实现步骤建造者模式的应用场景spring中应用 建…...

内存泄露排查思路

1、泄露情况 启动闪退运行一段时间宕机 2、排查步骤 获取堆内存快照dump使用VisualVM分析dump文件通过查看堆信息的情况&#xff0c;定位内存溢出问题 jmap -dump:formatb,fileheap.hprof pid -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPath输出路径 3、在VisualVM中分…...

kafka学习-概念与简单实战

目录 1、核心概念 消息和批次 Topic和Partition Replicas Offset broker和集群 生产者和消费者 2、开发实战 2.1、消息发送 介绍 代码实现 2.2、消息消费 介绍 代码实现 2.3、SpringBoot Kafka pom application.yaml KafkaConfig producer consumer 1、核心…...

爬虫进阶-反爬破解5(selenium的优势和点击操作+chrome的远程调试能力+通过Chrome隔离实现一台电脑登陆多个账号)

目录 一、selenium的优势和点击操作 二、chrome的远程调试能力 三、通过Chrome隔离实现一台电脑登陆多个账号 一、selenium的优势和点击操作 1.环境搭建 工具&#xff1a;Chrome浏览器chromedriverselenium win用户&#xff1a;chromedriver.exe放在python.exe旁边 MacO…...

音视频编码格式-AAC ADT

例子:config 1408 1408(16进制) : 0001 0100 0000 1000 audioObjectType&#xff08;5bit&#xff09;为 00010 , 即 2&#xff0c; profie (audioObjectType -1 ) AAC LC samplingFrequencyIndex (4bit) 为 1000 , 即 8 , 对应的采样频率为 16000 channelConfiguration (…...

【计算机网络】网络编程接口 Socket API 解读(3)

Socket 是网络协议栈暴露给编程人员的 API&#xff0c;相比复杂的计算机网络协议&#xff0c;API 对关键操作和配置数据进行了抽象&#xff0c;简化了程序编程。 本文讲述的 socket 内容源自 Linux 发行版 centos 9 上的 man 工具&#xff0c;和其他平台&#xff08;比如 os-x …...

kafka知识小结

1.为什么分区数只能增加,不能减少? 按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。 另外实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理? 如果随着分区一起消失则消息的可靠性得不到保障; 如果需要保留则又需…...

算法刷题记录-DP(LeetCode)

746. Min Cost Climbing Stairs 代码 int minCostClimbingStairs(vector<int>& cost) {if (cost.size()<2){return 0;}int cache[cost.size()1];cache[0]0;cache[1]0;for (int i 2; i < cost.size(); i) {cache[i] min(cache[i-2]cost[i-2],cache[i-1]cost[i…...

Springboot整合Neo4J图数据库

1.引入依赖 JDK11&#xff0c; neo4J4.4.23 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent …...

Unity 2018发布在iOS 16.3偶尔出现画面不动的问题

1&#xff09;Unity 2018发布在iOS 16.3偶尔出现画面不动的问题 2&#xff09;IL2CPP在Xcode下增量编译问题 3&#xff09;帧同步实现PuppetMaster布娃娃系统的问题 这是第351篇UWA技术知识分享的推送&#xff0c;精选了UWA社区的热门话题&#xff0c;涵盖了UWA问答、社区帖子等…...

蠕虫病毒流量分析案例

背景 某供排水集团的网络管理员对其网络的健康状况持认可态度&#xff0c;表示网络运行正常&#xff0c;没有发现异常行为。然而&#xff0c;由于网络环境变得越来越复杂&#xff0c;仅凭借传统的网络经验已经不能全面了解网络情况。因此&#xff0c;我们为供排水集团安装了Ne…...

Transformer(一)—— Attention Batch Normalization

Transformer详解 一、RNN循环神经网络二、seq2seq模型三、Attention&#xff08;注意力机制&#xff09;四、Transformer4.1 self attention4.2 self-attention的变形——Multi-head Self-attention4.3 Masked Attention4.4 Positional Encoding4.5 Batch Normalization4.6 Lay…...

2023高教社杯数学建模C题思路代码 - 蔬菜类商品的自动定价与补货决策

# 1 赛题 在生鲜商超中&#xff0c;一般蔬菜类商品的保鲜期都比较短&#xff0c;且品相随销售时间的增加而变差&#xff0c; 大部分品种如当日未售出&#xff0c;隔日就无法再售。因此&#xff0c; 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬菜…...

【C++漂流记】一文搞懂类与对象的封装

本篇文章主要说明了类与对象中封装的有关知识&#xff0c;包括属性和行为作为整体、访问权限、class与struct的区别、成员属性的私有化&#xff0c;希望这篇文章可以帮助你更好的了解类与对象这方面的知识。 文章目录 一、属性和行为作为整体二、访问权限三、class与struct的区…...

ctfshow 反序列化

PHP反序列化前置知识 序列化和反序列化 对象是不能在字节流中传输的&#xff0c;序列化就是把对象转化为字符串以便存储和传输&#xff0c;反序列化就是将字符串转化为对象 魔术方法 __construct() //构造&#xff0c;当对象new时调用 __wakeup() //执行unserialize()时&am…...

数据结构:线性表之-单向链表(无头)

目录 什么是单向链表 顺序表和链表的区别和联系 顺序表&#xff1a; 链表&#xff1a; 链表表示(单项)和实现 1.1 链表的概念及结构 1.2单链表(无头)的实现 所用文件 将有以下功能&#xff1a; 链表定义 创建新链表元素 尾插 头插 尾删 头删 查找-给一个节点的…...

为IT服务台构建自定义Zia操作

Zia是manageengine的商业人工智能助手&#xff0c;是ServiceDesk Plus Cloud的虚拟会话支持代理。使用Zia&#xff0c;您可以优化帮助台管理&#xff0c;还可以缩小最终用户与其帮助台之间的差距&#xff0c;Zia通过执行预配置的操作来帮助用户完成他们的服务台任务。 例如&…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

3403. 从盒子中找出字典序最大的字符串 I

3403. 从盒子中找出字典序最大的字符串 I 题目链接&#xff1a;3403. 从盒子中找出字典序最大的字符串 I 代码如下&#xff1a; class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题

在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件&#xff0c;这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下&#xff0c;实现高效测试与快速迭代&#xff1f;这一命题正考验着…...

纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join

纯 Java 项目&#xff08;非 SpringBoot&#xff09;集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...

【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案

目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后&#xff0c;迭代器会失效&#xff0c;因为顺序迭代器在内存中是连续存储的&#xff0c;元素删除后&#xff0c;后续元素会前移。 但一些场景中&#xff0c;我们又需要在执行删除操作…...

Python 高效图像帧提取与视频编码:实战指南

Python 高效图像帧提取与视频编码:实战指南 在音视频处理领域,图像帧提取与视频编码是基础但极具挑战性的任务。Python 结合强大的第三方库(如 OpenCV、FFmpeg、PyAV),可以高效处理视频流,实现快速帧提取、压缩编码等关键功能。本文将深入介绍如何优化这些流程,提高处理…...

MySQL的pymysql操作

本章是MySQL的最后一章&#xff0c;MySQL到此完结&#xff0c;下一站Hadoop&#xff01;&#xff01;&#xff01; 这章很简单&#xff0c;完整代码在最后&#xff0c;详细讲解之前python课程里面也有&#xff0c;感兴趣的可以往前找一下 一、查询操作 我们需要打开pycharm …...

基于单片机的宠物屋智能系统设计与实现(论文+源码)

本设计基于单片机的宠物屋智能系统核心是实现对宠物生活环境及状态的智能管理。系统以单片机为中枢&#xff0c;连接红外测温传感器&#xff0c;可实时精准捕捉宠物体温变化&#xff0c;以便及时发现健康异常&#xff1b;水位检测传感器时刻监测饮用水余量&#xff0c;防止宠物…...

Axure Rp 11 安装、汉化、授权

Axure Rp 11 安装、汉化、授权 1、前言2、汉化2.1、汉化文件下载2.2、windows汉化流程2.3、 macOs汉化流程 3、授权 1、前言 Axure Rp 11官方下载链接&#xff1a;https://www.axure.com/downloadthanks 2、汉化 2.1、汉化文件下载 链接: https://pan.baidu.com/s/18Clf…...

NineData数据库DevOps功能全面支持百度智能云向量数据库 VectorDB,助力企业 AI 应用高效落地

NineData 的数据库 DevOps 解决方案已完成对百度智能云向量数据库 VectorDB 的全链路适配&#xff0c;成为国内首批提供 VectorDB 原生操作能力的服务商。此次合作聚焦 AI 开发核心场景&#xff0c;通过标准化 SQL 工作台与细粒度权限管控两大能力&#xff0c;助力企业安全高效…...