Linux操作系统7- 线程同步与互斥7(RingQueue环形队列生产者消费者模型改进)
上篇文章:Linux操作系统7- 线程同步与互斥6(POSIX信号量与环形队列生产者消费者模型)-CSDN博客
本篇代码仓库:myLerningCode/l36 · 橘子真甜/Linux操作系统与网络编程学习 - 码云 - 开源中国 (gitee.com)
目录
一. 单生产单消费单保存模型
1.1 RingQueue.hpp
1.2 Task.hpp
1.3 MainPC.cpp
1.4 测试
二. 多生产多消费模型
2.1 分析与代码
2.2 多生产多消费的意义
一. 单生产单消费单保存模型
通过RingQueue可以实现生产者消费者之间的协同工作,如果现在想要将消费者的输出结果保存在文件中应该怎么办?
可以定义两个环形队列,三个线程。让消费者充当第二个队列的生产者。
代码如下:
1.1 RingQueue.hpp
直接使用上篇文件的代码即可。然后我们需要新增一个类,这个类中包含两个环形队列用于消费者同时访问两个队列
#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
const int gnum = 10; // 阻塞队列的最大容量template <class T>
class RingQueue
{
private:void P(sem_t &sem){// P操作,申请信号量sem--int n = sem_wait(&sem);if (n < 0)std::cerr << "P操作失败" << std::endl;}void V(sem_t &sem){// V操作,释放信号量,sem++int n = sem_post(&sem);if (n < 0)std::cerr << "V操作失败" << std::endl;}public:RingQueue(const int &maxnum = gnum) : _maxnum(maxnum), _ringQueue(gnum){// 在构造函数中完成信号量的初始化int n = sem_init(&_spaceSem, 0, maxnum);if (n < 0)std::cerr << "spaceSem信号量初始化失败" << std::endl;n = sem_init(&_dataSem, 0, 0);if (n < 0)std::cerr << "dataSem信号量初始化失败" << std::endl;_producerStep = _consumerStep = 0;}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);}// 生产者插入数据void push(const T &in){// 申请空间资源P(_spaceSem);// 插入数据_ringQueue[_producerStep++] = in;_producerStep %= _maxnum;// 释放一个数据资源V(_dataSem);}// 消费者获取数据void pop(T *out){// 申请数据资源P(_dataSem);// 插入数据*out = _ringQueue[_consumerStep++];_consumerStep %= _maxnum;// 释放一个空间资源V(_spaceSem);}private:std::vector<T> _ringQueue; // 使用数组来实现环形队列size_t _maxnum;sem_t _spaceSem; // 生产者空间资源信号量sem_t _dataSem; // 消费者数据资源信号量int _producerStep; // 生产者下标int _consumerStep; // 消费者下标
};
1.2 Task.hpp
需要新增一个保存者的任务
#pragma once
#include <iostream>
#include <cstdio>
#include <functional>class CalTask
{using func_t = std::function<int(int, int, char)>; // func是一个函数// typedef std::function<int(int,int)> func;
public:CalTask() {}CalTask(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callback(func) {}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof(buffer) - 1, "%d %c %d = %d", _x, _op, _y, result);return buffer;}// 返回任务操作的结果std::string toString(){char buffer[1024];snprintf(buffer, sizeof(buffer) - 1, "%d %c %d = ?", _x, _op, _y);return buffer;}private:int _x;int _y;char _op; // 操作的任务的idfunc_t _callback; // 调用的函数
};class SaveTask
{typedef std::function<void(const std::string &)> func_t;public:SaveTask() {}SaveTask(const std::string &message, func_t func): _message(message), _callback(func) {}void operator()(){_callback(_message);}private:std::string _message; // 保存的信息func_t _callback; // 将信息写入文件中
};const std::string oper = "+-*/%";
int my_math(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero" << std::endl;return -1;}else{result = x / y;}break;}case '%':if (y == 0){std::cerr << "moved zero" << std::endl;return -1;}else{result = x % y;}break;default:break;}return result;
}void Save(const std::string &message)
{const std::string task_pwd = "./log.txt";FILE *fp = fopen(task_pwd.c_str(), "a+");if (nullptr == fp){std::cerr << "saver open error" << std::endl;return;}fputs(message.c_str(), fp);fputc('\n', fp);fclose(fp);
}
1.3 MainPC.cpp
#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>// 生产者
void *ProductorRoutine(void *args)
{RingQueue<CalTask> *cal_rq = (static_cast<RingQueues<CalTask, SaveTask> *>(args))->_cal_rq;while (true){int x = rand() % 20;int y = rand() % 50;const char op = oper[rand() % oper.size()];CalTask ct(x, y, op, my_math);cal_rq->push(ct);std::cout << "生产者生产任务:" << ct.toString() << " 并传递给消费者完成" << std::endl;}
}// 消费者
void *ConsumerRoutine(void *args)
{RingQueue<CalTask> *cal_rq = (static_cast<RingQueues<CalTask, SaveTask> *>(args))->_cal_rq;RingQueue<SaveTask> *save_rq = (static_cast<RingQueues<CalTask, SaveTask> *>(args))->_save_rq;while (true){CalTask ct;cal_rq->pop(&ct);std::string result = ct();std::cout << "消费者实现任务:" << result << " 实现完成!" << std::endl;SaveTask st(result, Save);save_rq->push(st);std::cout << "消费者传递任务:" << result << " 给保存者完成!" << std::endl;}
}// 保存者
void *SaverRoutine(void *args)
{RingQueue<SaveTask> *save_rq = (static_cast<RingQueues<CalTask, SaveTask> *>(args))->_save_rq;while (true){SaveTask st;save_rq->pop(&st);st();std::cout << "保存者保存任务完成!" << std::endl;}
}void test1()
{RingQueues<CalTask, SaveTask> *rqs = new RingQueues<CalTask, SaveTask>;rqs->_cal_rq = new RingQueue<CalTask>();rqs->_save_rq = new RingQueue<SaveTask>();pthread_t p, c, s;pthread_create(&p, nullptr, ProductorRoutine, rqs);pthread_create(&c, nullptr, ConsumerRoutine, rqs);pthread_create(&s, nullptr, SaverRoutine, rqs);pthread_join(p, nullptr);pthread_join(c, nullptr);pthread_join(s, nullptr);delete rqs;
}int main()
{srand((unsigned int)time(0) ^ getpid() ^ pthread_self());test1();return 0;
}
1.4 测试
运行结果如下:

二. 多生产多消费模型
2.1 分析与代码
RingQueue环形队列可以保证单个生产者和单个消费者之间的同步与互斥,如果现在有多个生产者和多个消费者的话。如何保证生产者之间的互斥?消费者者之间的互斥?
阻塞队列中,我们通过加锁的方式让同一时刻只能有一个生产者线程进入临界区或者一个消费者进入临界区。
而环形队列中, 通过信号量保证了生产者消费者之间的同步与互斥。如果想要保证消费者与消费者之间的互斥,生产者与生产者之间的互斥,也需要加锁保护
在RingQueue中添加两个成员变量,一个生产者互斥锁,一个消费者互斥锁。同时需要在构造函数中完成锁的初始化,析构函数中完成锁的销毁。
并且在push函数中加生产者锁,在pop函数中加消费者锁。以实现生产者与生产者之间的互斥和消费者与消费者之间的互斥。(本质是防止多个线程同时访问导致生产者下标或者消费者下标出现数据错误)
代码如下:
#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
const int gnum = 10; // 阻塞队列的最大容量template <class T>
class RingQueue
{
private:void P(sem_t &sem){// P操作,申请信号量sem--int n = sem_wait(&sem);if (n < 0)std::cerr << "P操作失败" << std::endl;}void V(sem_t &sem){// V操作,释放信号量,sem++int n = sem_post(&sem);if (n < 0)std::cerr << "V操作失败" << std::endl;}public:RingQueue(const int &maxnum = gnum) : _maxnum(maxnum), _ringQueue(gnum){// 在构造函数中完成信号量的初始化int n = sem_init(&_spaceSem, 0, maxnum);if (n < 0)std::cerr << "spaceSem信号量初始化失败" << std::endl;n = sem_init(&_dataSem, 0, 0);if (n < 0)std::cerr << "dataSem信号量初始化失败" << std::endl;_producerStep = _consumerStep = 0;// 初始化锁pthread_mutex_init(&_pmtx, nullptr);pthread_mutex_init(&_cmtx, nullptr);}~RingQueue(){// 销毁信号量与互斥锁sem_destroy(&_spaceSem);sem_destroy(&_dataSem);//pthread_mutex_destroy(&_cmtx);pthread_mutex_destroy(&_pmtx);}// 生产者插入数据void push(const T &in){// 生产者加锁,保证生产者与生产者之间的互斥pthread_mutex_lock(&_pmtx);// 申请空间资源P(_spaceSem);// 插入数据_ringQueue[_producerStep++] = in;_producerStep %= _maxnum;// 释放一个数据资源V(_dataSem);// 解锁pthread_mutex_unlock(&_pmtx);}// 消费者获取数据void pop(T *out){// 消费者加锁pthread_mutex_lock(&_cmtx);// 申请数据资源P(_dataSem);// 插入数据*out = _ringQueue[_consumerStep++];_consumerStep %= _maxnum;// 释放一个空间资源V(_spaceSem);// 解锁pthread_mutex_unlock(&_cmtx);}private:std::vector<T> _ringQueue; // 使用数组来实现环形队列size_t _maxnum;sem_t _spaceSem; // 生产者空间资源信号量sem_t _dataSem; // 消费者数据资源信号量int _producerStep; // 生产者下标int _consumerStep; // 消费者下标pthread_mutex_t _pmtx;pthread_mutex_t _cmtx;
};
MainPC.cpp
#include <iostream>
#include <memory>
#include <string>#include <unistd.h>
#include <pthread.h>
#include "RingQueue.hpp"
#include "Task.hpp"const std::string OP = "+-*/%";
void *producer(void *args)
{// 获取交易场所 - 阻塞队列RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(args);while (true){int x = rand() % 100;int y = rand() % 100;char op = OP[rand() % OP.size()];// 打印日志printf("生产者生产的数据:%d %c %d 并交给消费者计算\n", x, op, y);CalTask ct(x, y, op, my_math);rq->push(ct);}return nullptr;
}void *consumer(void *args)
{// 获取交易场所 - 生产消费阻塞队列,消费保存阻塞队列RingQueue<CalTask> *rq = static_cast<RingQueue<CalTask> *>(args);while (true){// 获取任务计算CalTask ct;rq->pop(&ct);std::string result = ct();std::cout << "消费者获取数据并计算:" << result << std::endl;}return nullptr;
}int main()
{srand((unsigned int)time(0) ^ getpid());// 建立任务队列和保存队列RingQueue<CalTask> *rq = new RingQueue<CalTask>;pthread_t c[3], p[3];pthread_create(&c[0], nullptr, consumer, (void *)rq);pthread_create(&c[1], nullptr, consumer, (void *)rq);pthread_create(&c[2], nullptr, consumer, (void *)rq);pthread_create(&p[0], nullptr, producer, (void *)rq);pthread_create(&p[1], nullptr, producer, (void *)rq);pthread_create(&p[2], nullptr, producer, (void *)rq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(c[2], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);delete rq;return 0;
}
测试结果如下:

可以看到,长时间没有出错
如果将加锁解锁操作进行注释:

会出现段错误,因为访问了非法内存
2.2 优化加解锁
我们申请信号量和释放信号量是原子操作,不需要加锁解锁。无需将这个两个代码放入到临界区。并且一个线程在加锁期间,其他线程是可以申请信号量的!
// 生产者插入数据void push(const T &in){// 申请空间资源P(_spaceSem);// 生产者加锁,保证生产者与生产者之间的互斥pthread_mutex_lock(&_pmtx);// 插入数据_ringQueue[_producerStep++] = in;_producerStep %= _maxnum;// 解锁pthread_mutex_unlock(&_pmtx);// 释放一个数据资源V(_dataSem);}// 消费者获取数据void pop(T *out){// 申请数据资源P(_dataSem);// 消费者加锁pthread_mutex_lock(&_cmtx);// 插入数据*out = _ringQueue[_consumerStep++];_consumerStep %= _maxnum;// 解锁pthread_mutex_unlock(&_cmtx);// 释放一个空间资源V(_spaceSem);}
一样一来,一个线程加锁进入临界区之后不会去影响其他线程申请信号量。从而提高整体的效率。
2.3 多生产多消费的意义
与阻塞队列BlockQueue一样,多生产多消费的时候。生产线程生产数据可能需要很多时间,一个生产者生产者访问环形队列的时候不妨碍其他线程生产自己的资源。一个消费者访问环形队列拿数据的时候不妨碍其他消费者拿到数据进行处理
相关文章:
Linux操作系统7- 线程同步与互斥7(RingQueue环形队列生产者消费者模型改进)
上篇文章:Linux操作系统7- 线程同步与互斥6(POSIX信号量与环形队列生产者消费者模型)-CSDN博客 本篇代码仓库:myLerningCode/l36 橘子真甜/Linux操作系统与网络编程学习 - 码云 - 开源中国 (gitee.com) 目录 一. 单生产单消费单保…...
景区导游--LCA+树上前缀和
关键就是删点少删边,只删有关的边 LCA找最近祖先,树上前缀和记,画图找公式,特殊情况为删第一和最后 sum和前缀和开ll #include<bits/stdc.h> using namespace std; #define N 100011 typedef long long ll; typedef pair…...
将 Markdown 表格结构转换为Excel 文件
在数据管理和文档编写过程中,我们经常使用 Markdown 来记录表格数据。然而,Markdown 格式的表格在实际应用中不如 Excel 方便,特别是需要进一步处理数据时。因此,我们开发了一个使用 wxPython 的 GUI 工具,将 Markdown…...
OpenAI流式解析
OpenAI 流式的代码: 首选一般请使用os.getenv 去读环境变量的内容 注意使用pip install python-dotenv 的安装方法 load_dotenv 是这个库提供的一个函数,用于读取 .env 文件并将其中定义的键值对设置为系统的环境变量。 默认情况下,load_…...
Java动态生成Word终极指南:poi-tl与Aspose.Words性能对比及选型建议
在Java中实现复杂文档生成(如合同、报表)时,poi-tl、Aspose.Words 和 docx4j 是三个主流的模板技术方案。以下是它们的核心对比和选型建议: 1. poi-tl(基于Apache POI的模板引擎) 定位:轻量级开…...
微信小程序逆向开发
一.wxapkg文件 如何查看微信小程序包文件: 回退一级 点击进入这个目录 这个就是我们小程序对应的文件 .wxapkg概述 .wxapkg是微信小程序的包文件格式,且其具有独特的结构和加密方式。它不仅包含了小程序的源代码,还包括了图像和其他资源文…...
Spring Data审计利器:@LastModifiedDate详解!!!
🕒 Spring Data审计利器:LastModifiedDate详解🔥 🌟 简介 在数据驱动的应用中,记录数据的最后修改时间是常见需求。Spring Data的LastModifiedDate注解让这一过程自动化成为可能!本篇带你掌握它的核心用法…...
wms窗口/多窗口/自由窗口systemui侧边栏手势退出实战-学员作业
背景: 再学习了马哥的分屏自由窗口专题课程时候,有一个需求就是实现自由窗口置顶的功能,这个需求实现后,自由窗口就会一直处于顶端,不会因为打开其他Activity导致自由窗口退出。 不会因为打开了其他Activity而导致短…...
树莓派超全系列文档--(11)RaspberryOS上使用 Python控制GPIO
RaspberryOS上使用 Python控制GPIO 使用 Python 控制 GPIOLED 控制读取按键状态使用按钮控制LED 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 Python 控制 GPIO 使用 GPIO Zero 库可以轻松地用 Python 控制 GPIO 设备。该库在 gpiozero.…...
服装零售行业数据分析方案
在数据洪流的时代,大数据分析已成为服装产业的强大引擎,助力企业飞速提升运营效率,削减成本,并优化资源配置。在服饰行业的生产运营链中,商业智能(BI)工具扮演着至关重要的角色,它们…...
构建高可用性西门子Camstar服务守护者:异常监控与自愈实践
在智能制造领域,西门子Camstar作为领先的MES系统承载着关键生产业务。但在实际运维中,我们发现其服务常因数据库负载激增(如SQL阻塞链超时)或应用服务器资源耗尽(CPU峰值达90%以上)导致服务不可用。传统人工干预方式平均故障恢复时间长达47分钟,这对连续生产场景构成了严…...
基于大模型的pc版语音对话问答
Vosk基础知识: Vosk 是一个强大的开源语音识别工具包,以下是对它的详细介绍: 特点 离线识别:Vosk 的显著特点是支持离线语音识别。这意味着在没有网络连接的情况下,也能进行语音识别操作,避免了因网络问…...
深入理解 Linux 内核中的 GPU 子系统:从 DRM 到 NXP 驱动架构全解读
本文不仅为 GPU 子系统的深入复习笔记,更是一本面向 Linux 内核开发者、嵌入式图形系统开发人员的实践指南。本文围绕 drivers/gpu 展开,特别聚焦 NXP i.MX 系列平台的 GPU 架构和 Linux-imx 的实现方式,内容超 5000 字,适合收藏学…...
Go 语言标准库中path模块详细功能介绍与示例
Go语言的 path 模块提供了处理斜杠分隔路径的通用方法,适用于跨平台路径操作(如 URL 路径或 Unix 风格路径)。以下是 path 模块的核心方法及示例说明: 1. path.Base 返回路径的最后一个元素(类似 Unix 的 basename 命…...
鸿蒙NEXT开发App相关工具类
import bundleManager from ohos.bundle.bundleManager; import { KeyboardAvoidMode, window } from kit.ArkUI; import { common, ConfigurationConstant } from kit.AbilityKit;/*** App相关工具类(使用该工具前请在UIAbility的onWindowStageCreate方法中调用AppUtil的init方…...
Kafka 的高可用性
Kafka 的高可用性主要通过副本机制、ISR(In-Sync Replicas)列表和控制器 Broker 来实现。这些机制共同确保了 Kafka 集群在部分节点故障时仍然可以正常运行,数据不会丢失,并且服务不会中断。 1. 副本机制 Kafka 的副本机制是其高…...
docker 部署 postgresql 切换用户
① 启动容器 docker run -d --name postgres-e POSTGRES_PASSWORDpostgres-p 5432:5432 postgres su - omm gsql -d postgres -p 5432 # 将会在postgres下创建用户test1,在其他数据库下是无法删除此用户 CREATE USER test1 WITH Sysadmin IDENTIFIED BY Zcxzhf175…...
Allegro界面颜色改变设置
概述:本文主要讲解如何改变allegro的背景颜色,改为自己喜欢的颜色 1、 打开Allegro文件 2、 Setup—User Preference—UI—General—Allegro_theme选择Light即可 改变前 改变后...
【log4j】配置Slf4j
配置Slf4j 引入lombok包 <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.36</version><scope>provided</scope> </dependency>引入log4j相关api <dependency…...
ThreadLocal与Cookie + Session?
这篇文章主要在做 Echo 社区项目的时候写的,在保持用户登录态的这个需求下,为啥要用 ThreadLocal 存储用户信息,而不是采用常见的 Cookie Session。 Cookie Session 由于 HTTP 协议是无状态的,完成操作关闭浏览器后,…...
freecad手动装插件 add on
python工作台输入 FreeCAD.ConfigGet("UserAppData") 在返回的地址上新建文件夹:Mod #like /home/chen/snap/freecad/common 进入Mod #like /home/chen/snap/freecad/common/Mod git clone 你要的项目 #like git clone https://github.com/looooo/f…...
【算法】二分查找(下)
一、山峰数组的峰顶索引 题目链接:852. 山脉数组的峰顶索引 - 力扣(LeetCode) 题目描述: 给定一个长度为 n 的整数 山脉 数组 arr ,其中的值递增到一个 峰值元素 然后递减。 返回峰值元素的下标。 你必须设计并实现时…...
【动手学深度学习】#6 卷积神经网络
主要参考学习资料: 《动手学深度学习》阿斯顿张 等 著 【动手学深度学习 PyTorch版】哔哩哔哩跟李牧学AI 由于本系列一开始跳过了第一章引言部分,因此系列编号比书本章节编号提前。现改为和书本统一(因为之前自己的原始笔记也是按照书本章节编…...
认识一家公司:瑞芯微(Rockchip Electronics Co., Ltd.)以及旗下的两款芯片RK3288\RK3588
瑞芯微(Rockchip Electronics Co., Ltd.)简介 一、公司概况 瑞芯微电子股份有限公司(简称“瑞芯微”)成立于2001年,总部位于中国福建省福州市,是一家专注于集成电路设计与研发的高新技术企业。公司采用Fa…...
爬虫面试题
总结一下最近面试遇到的笔试题 1、解释Python中的init方法的作用。 在Python中,__init__方法是一种特殊的构造方法,主要用于在创建类的实例时初始化对象。至少接受至少一个参数:self,它是对当前实例的引用,可以通过添加其他参数…...
Netty——零拷贝
文章目录 1. 什么是零拷贝?2. 为什么需要零拷贝?2.1 传统 I/O 的拷贝流程2.2 零拷贝的优化2.2.1 通过 sendfile 系统调用2.2.2 通过 mmap (内存映射) 系统调用 3. Netty 实现零拷贝的方式3.1 文件传输优化:FileRegion 封装3.2 直接内存 (Dire…...
Java制作简单的聊天室(复习)
设计的知识点:几乎包含java基础的全部知识点(java基础语法,java基础进阶:双列集合,io流,多线程,网络编程等) 代码如下 客户端: 服务器采用的时多线程的循环多线程的方式…...
ES 字段的映射定义了字段的类型及其行为
在 Elasticsearch 中,字段的映射定义了字段的类型及其行为。你提供的 content_answer 字段映射如下: Json 深色版本 "content_answer": { "type": "text", "fields": { "keyword": { …...
Android开发点击字符串web链接跳到系统浏览器上
Android开发点击字符串web链接跳到系统浏览器上 直接上代码:用到你就拿去用 public static void performItemUrlClick(View view, String contentUrl) {if (!TextUtils.isEmpty(contentUrl)) {Intent intent new Intent();if (!contentUrl.startsWith("http…...
运维规则之总结(Summary of Operation and Maintenance Rules)
运维规则之总结 在运维领域,经验和流程往往决定了系统的稳定性与可靠性。一个运维人,总结出了以下10条运维规则,涵盖了从基础管理到高级策略的全面内容,旨在帮助运维人员更好地应对各种挑战,确保系统的平稳运行。 1.…...
