【Linux取经路】基于信号量和环形队列的生产消费者模型
文章目录
- 一、POSIX 信号量
- 二、POSIX 信号量的接口
- 2.1 sem_init——初始化信号量
- 2.2 sem_destroy——销毁信号量
- 2.3 sem_wait——等待信号量
- 2.4 sem_post——发布信号量
- 三、基于环形队列的生产消费者模型
- 3.1 单生产单消费模型
- 3.2 多生产多消费模型
- 3.3 基于任务的多生产多消费模型
- 四、结语
一、POSIX 信号量
共享资源也可以被看成多份,只要规定好每个线程的访问区域即可,此时就可以让多线程去并发的访问临界资源。
POSIX
信号量和 SystemV
信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的。但 POSIX
可以用于线程间同步。信号量本质是一把计数器,用来描述可用资源数目的,申请信号量时,其实就已经在间接的做判断,看资源是否就绪了,只要申请到信号量,那么说明资源一定是就绪的。
信号量只能保证,不让多余的线程来访问共享资源,即,当前共享资源有十份,信号量不会允许同时有十一个线程来访问临界资源。但是具体的资源分配是通过程序员编码去实现的。如果出现一个共享资源同时被两个线程访问,就属于程序员的编码 Bug。
二、POSIX 信号量的接口
2.1 sem_init——初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
-
sem
:要初始化的信号量 -
pshared
:0表示线程间共享,非0表示进程间共享。 -
value
:信号量初始值
2.2 sem_destroy——销毁信号量
int sem_destroy(sem_t *sem);
2.3 sem_wait——等待信号量
int sem_wait(sem_t *sem); //P()
- 功能:会将信号量的值减1
2.4 sem_post——发布信号量
int sem_post(sem_t *sem);//V()
- 功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量的值加1
三、基于环形队列的生产消费者模型
只要生产和消费不访问同一个格子,那么生产和消费就可以同时进行。那生产和消费什么时候会指向同一个数据呢?答案是队列为空和为满的时候。
基于环形队列的生产消费者模型必须遵守以下三个原则:
-
当生产和消费指向同一个资源的时候,只能一个人访问。为空的时候,由生产者去访问;为满的时候,由消费者去访问
-
消费者不能超过生产者
-
生产者不能把消费者套圈,因为这样会导致数据被覆盖
生产者最关心还剩多少空间(空间数量);消费者最关系还剩多少数据(数据数量)。因为有两种资源,所以需要定义两个信号量。
3.1 单生产单消费模型
// RingQueue.hpp
#pragma once#include <pthread.h>
#include <vector>
#include <semaphore.h>template<class T>
class RingQueue
{
private:static const int defaultcap = 5;void P(sem_t *sem) // 申请一个信号量{sem_wait(sem); }void V(sem_t *sem) // 归还一个信号量{sem_post(sem);}
public:RingQueue(int cap = defaultcap):ringqueue_(cap), cap_(cap), c_step(0), p_step(0){sem_init(&cdata_sem, 0, 0);sem_init(&pspace_sem, 0, cap_);}void Push(const T &data) // 生产行为{P(&pspace_sem);ringqueue_[p_step] = data;V(&cdata_sem);p_step++;p_step %= cap_;}void Pop(T *out) // 消费行为{P(&cdata_sem);*out = ringqueue_[c_step];V(&pspace_sem);c_step++;c_step %= cap_;}~RingQueue(){sem_destroy(&cdata_sem);sem_destroy(&pspace_sem);}
private:std::vector<T> ringqueue_; // 环形队列int cap_; // 容量int c_step; // 消费者下一个要消费的位置int p_step; // 生产者下一个要生产的位置sem_t cdata_sem; // 数据资源sem_t pspace_sem; // 空间资源
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>using namespace std;void *Consumer(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);while(true){int data = 0;rq->Pop(&data);cout << "Consumer is running... get a data: " << data << endl;// 模拟处理数据usleep(1000000);}
}void *Productor(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);while(true){// 获取数据usleep(10000); // 模拟获取数据int data = rand() % 10;rq->Push(data);cout << "Productor is running... produce a data: " << data << endl;}
}int main()
{srand((unsigned int)time(nullptr));pthread_t c, p;RingQueue<int> *rq = new RingQueue<int>();pthread_create(&c, nullptr, Consumer, rq);pthread_create(&p, nullptr, Productor, rq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}
互斥与同步的体现:当生产下标和消费下标相同的时候,只允许一个来访问,这就是互斥性的体现。当队列为空的时候,让生产者去访问资源,当队列为满的时候,让消费者去访问资源,这就是在指向同一个位置时,让生产和消费具有一定的顺序性,这就是同步性的体现。当队列不为空或不为满的时候,生产下标和消费下标不同,此时两个线程并发执行,并没有体现出很强的互斥特性。
3.2 多生产多消费模型
此时需要对下标资源进行保护。因为生产下标和消费下标各自只有一份,不允许同时有多个生产线程去访问生产下标,消费线程也一样。因此需要通过加锁来实现生产线程之间的互斥和消费线程之间的互斥。
先加锁还是先申请信号量?答案是先申请信号量,以生产线程为例,这样可以让所有生产线程并发的去执行,什么意思呢?如果是先加锁再申请信号量的话,因为始终只有一个生产者线程能够申请到锁,所以也就只有一个生产者线程能去申请信号量,其他生产者线程只能干巴巴的等待锁被释放。这时申请锁和申请信号量的动作是串行的。而先申请信号量的话,可以保证虽然只有一个线程能够申请到锁,但是其他没有锁的线程也可以不用闲着,可以先去申请信号量,因为信号量的申请是原子的,因此也不需要加锁进行保护,只要能申请到信号量,就说明资源还有,此时那些申请到信号量的线程就可能等待锁被释放,拿到锁之后就可以去执行相应的代码了。
// RingQueue.hpp
#pragma once#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <string>template<class T>
class RingQueue
{
private:static const int defaultcap = 5;void P(sem_t *sem) // 申请一个信号量{sem_wait(sem); }void V(sem_t *sem) // 归还一个信号量{sem_post(sem);}void Lock(pthread_mutex_t *mutex){pthread_mutex_lock(mutex);}void Unlock(pthread_mutex_t *mutex){pthread_mutex_unlock(mutex);}
public:RingQueue(int cap = defaultcap):ringqueue_(cap), cap_(cap), c_step(0), p_step(0){sem_init(&cdata_sem, 0, 0);sem_init(&pspace_sem, 0, cap_);pthread_mutex_init(&c_mutex, nullptr);pthread_mutex_init(&p_mutex, nullptr);}void Push(const T &data) // 生产行为{P(&pspace_sem);Lock(&p_mutex);ringqueue_[p_step] = data;p_step++;p_step %= cap_;Unlock(&p_mutex);V(&cdata_sem);}void Pop(T *out) // 消费行为{P(&cdata_sem); // 信号量资源是不需要保护的,因为它的操作是原子的,临界区中的代码要尽可能的少,所以不需要把信号量的申请放在加锁之后Lock(&c_mutex);*out = ringqueue_[c_step];c_step++;c_step %= cap_;Unlock(&c_mutex);V(&pspace_sem);}~RingQueue(){sem_destroy(&cdata_sem);sem_destroy(&pspace_sem);pthread_mutex_destroy(&c_mutex);pthread_mutex_destroy(&p_mutex);}
private:std::vector<T> ringqueue_; // 环形队列int cap_; // 容量int c_step; // 消费者下一个要消费的位置int p_step; // 生产者下一个要生产的位置sem_t cdata_sem; // 数据资源sem_t pspace_sem; // 空间资源pthread_mutex_t c_mutex; // 对消费下标的保护pthread_mutex_t p_mutex; // 对生产下标的保护
};template <class T>
class Message
{
public:Message(std::string thread_name, RingQueue<T> *ringqueue):thread_name_(thread_name), ringqueue_(ringqueue){}std::string &get_thread_name(){return thread_name_;}RingQueue<T> *get_ringqueue(){return ringqueue_;}
private:std::string thread_name_;RingQueue<T> *ringqueue_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>using namespace std;void *Consumer(void *args)
{Message<int> *message = static_cast<Message<int> *>(args);RingQueue<int> *rq = message->get_ringqueue();string name = message->get_thread_name();while (true){int data = 0;rq->Pop(&data);printf("%s is running... get a data: %d\n", name.c_str(), data);// 模拟处理数据// usleep(1000000);}
}void *Productor(void *args)
{Message<int> *message = static_cast<Message<int> *>(args);RingQueue<int> *rq = message->get_ringqueue();string name = message->get_thread_name();while (true){// 获取数据// usleep(1000000); // 模拟获取数据int data = rand() % 10;rq->Push(data);printf("%s is running... produce a data: %d\n", name.c_str(), data);usleep(1000000);}
}int main()
{srand((unsigned int)time(nullptr));pthread_t c[3], p[5];RingQueue<int> *rq = new RingQueue<int>(); vector<Message<int>*> messages; for (int i = 0; i < 5; i++){Message<int> *message = new Message<int>("Produttor Thread "+to_string(i), rq);pthread_create(p + i, nullptr, Productor, message);messages.push_back(message);}for (int i = 0; i < 3; i++){Message<int> *message = new Message<int>("Consumer Thread "+to_string(i), rq);pthread_create(c + i, nullptr, Consumer, message);messages.push_back(message);}for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}for (auto message : messages){delete message;}delete rq;return 0;
}
3.3 基于任务的多生产多消费模型
RingQueue
的内容不变
// Task.h
#include <iostream>
#include <string>enum
{DIVERROR = 1,MODERROR,UNKNOWERRROR
};class Task
{
public:Task(int a = 0, int b = 0, char op = '+'):data1_(a), data2_(b), op_(op), result_(0), exitcode_(0){}void run(){switch(op_){case '+':result_ = data1_ + data2_;break;case '-':result_ = data1_ - data2_;break;case '*':result_ = data1_ * data2_;break;case '/':if(data2_ == 0) exitcode_ = DIVERROR;else result_ = data1_ / data2_;break;case '%':if(data2_ == 0) exitcode_ = MODERROR;else result_ = data1_ % data2_;break;default:exitcode_ = UNKNOWERRROR;break;}}std::string result_to_string(){std::string ret = std::to_string(data1_);ret += ' ';ret += op_;ret += ' ';ret += std::to_string(data2_);ret += ' ';ret += '=';ret += ' ';ret += std::to_string(result_);ret += "[exitcode: ";ret += std::to_string(exitcode_);ret += ']';return ret;}std::string get_task(){std::string ret = std::to_string(data1_);ret += ' ';ret += op_;ret += ' ';ret += std::to_string(data2_);ret += ' ';ret += '=';ret += ' ';ret += '?';return ret;}
private:int data1_;int data2_;char op_;int result_;int exitcode_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>
#include "Task.h"using namespace std;const std::string opers = "+-*/%";void *Consumer(void *args)
{Message<Task> *message = static_cast<Message<Task> *>(args);RingQueue<Task> *rq = message->get_ringqueue();string name = message->get_thread_name();while (true){// 获取任务// int data = 0;Task task;rq->Pop(&task);// 对任务做处理task.run();printf("%s is running... get a data: %s\n", name.c_str(), task.result_to_string().c_str());// 模拟处理数据// usleep(1000000);}
}void *Productor(void *args)
{Message<Task> *message = static_cast<Message<Task> *>(args);RingQueue<Task> *rq = message->get_ringqueue();string name = message->get_thread_name();int len = opers.size();while (true){// 获取数据// usleep(1000000); // 模拟获取数据// int data = rand() % 10;// 模拟获取数据int data1 = rand() % 10 + 1; // [1, 10]usleep(10);int data2 = rand() % 13; // [0, 13]usleep(10);char op = opers[rand() % len];Task task(data1, data2, op);// 生产数据rq->Push(task);// printf("%s is running... produce a data: %d\n", name.c_str(), data);printf("%s is running... produce a Task: %s\n", name.c_str(), task.get_task().c_str());usleep(1000000);}
}int main()
{srand((unsigned int)time(nullptr));pthread_t c[3], p[2];RingQueue<Task> *rq = new RingQueue<Task>(); vector<Message<Task>*> messages; for (int i = 0; i < 5; i++){Message<Task> *message = new Message<Task>("Produttor Thread "+to_string(i), rq);pthread_create(p + i, nullptr, Productor, message);messages.push_back(message);}for (int i = 0; i < 3; i++){Message<Task> *message = new Message<Task>("Consumer Thread "+to_string(i), rq);pthread_create(c + i, nullptr, Consumer, message);messages.push_back(message);}// 等待子线程for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}// 释放资源for (auto message : messages){delete message;}delete rq;return 0;
}
四、结语
今天的分享到这里就结束啦!如果觉得文章还不错的话,可以三连支持一下,春人的主页还有很多有趣的文章,欢迎小伙伴们前去点评,您的支持就是春人前进的动力!
相关文章:

【Linux取经路】基于信号量和环形队列的生产消费者模型
文章目录 一、POSIX 信号量二、POSIX 信号量的接口2.1 sem_init——初始化信号量2.2 sem_destroy——销毁信号量2.3 sem_wait——等待信号量2.4 sem_post——发布信号量 三、基于环形队列的生产消费者模型3.1 单生产单消费模型3.2 多生产多消费模型3.3 基于任务的多生产多消费模…...

计算机SCI期刊,中科院2区,收稿范围非常广泛!
一、期刊名称 Journal of Web Semantics 二、期刊简介概况 期刊类型:SCI 学科领域:计算机科学 影响因子:2.5 中科院分区:2区 出版方式:开放出版 版面费:$1600 三、期刊征稿范围 《网络语义学杂志》…...

JDK、JRE、编译指令和垃圾回收机制详解
JDK 全称 Java SE Development Kit (Java 开发工具包) JVM虚拟机:Java运行的地方 核心类库:Java提前编好的东西 开发工具: javac,java,jdb,jhat javac:Java编译器,用于将Java源代码编译成Java字节码文件(.class)。 java: java…...

【ARM 嵌入式 C 入门及渐进 6.2 -- ARMv8 C 内嵌汇编读系统寄存器的函数实现】
请阅读【嵌入式开发学习必备专栏】 文章目录 ARMv8 C 内嵌汇编读系统寄存器 ARMv8 C 内嵌汇编读系统寄存器 要在ARMv8架构中通过C代码和内嵌汇编来读取系统寄存器s3_0_c15_c5_5的值,并将其返回,可以按照以下方式实现system_read_reg函数: #…...

使用 LlamaParse 进行 PDF 解析并创建知识图谱
此 Python 笔记本提供了有关利用 LlamaParse 从 PDF 文档中提取信息并随后将提取的内容存储到 Neo4j 图形数据库中的综合指南。本教程在设计时考虑到了实用性,适合对文档处理、信息提取和图形数据库技术感兴趣的开发人员、数据科学家和技术爱好者。 该笔记本电脑的主…...

Oracle行迁移解析
行迁移(Row Migration)是Oracle数据库中的另一个现象,它与行链接类似,都是由于数据行大小的变化导致的存储问题,但其本质和影响有所不同。 触发条件:行迁移发生在当一个已存在的、原先能够完全存储在一个数…...

【k8s】 busybox镜像、挂载volume卷
1. 概述 busybox是一个包含了nslookup,ping,wget等网络处理命令的Pod容器(不含curl命令),它的体积非常小,适合做一些容器内的网络调试。 即创建一个docker ,进去执行 ping 命令等 2. 启动容器 2.1 会自动退出&…...

文本三剑客之 sed 编辑器
一.sed 概述 1.sed 介绍 sed是一种流编辑器,流编辑器会在编辑器处理数据之前基于预先提供的一组规则来编辑数据流。 sed编辑器可以根据命令来处理数据流中的数据,这些命令要么从命令行中输入,要么存储在一个 命令文本文件中。 2.sed 的工…...

【MySQL精通之路】SQL优化(1)-查询优化(3)-索引合并
主博客: 【MySQL精通之路】SQL优化(1)-CSDN博客 上一篇: 【MySQL精通之路】SQL优化(1)-查询优化(2)-范围查询优化-CSDN博客 下一篇: 目录 1.索引合并-交集访问算法 2.索引合并联合访问算法 3.索引合并-排序联合访问算法 4.影响索引合…...

Linux中安装配置并使用samba服务(Centos以及Ubuntu)
目录 前言1. 基本知识2. Centos3. Ubuntu3.1 物理服务器3.2 云服务器前言 在window与linux系统中配置一个共享文件夹,可以做很多时间,比如映射器或者像linux中定时存放文件等 1. 基本知识 在Ubuntu上安装和配置Samba服务可以让你的Ubuntu机器与Windows、macOS以及其他Linu…...

three.js能实现啥效果?看过来,这里都是它的菜(06)
这是第五期了,本期继续分享three.js可以实现的3D动画案例,有老铁反馈再发案例的时候,是否可以顺道分享一下three.js的知识点,好吧,安排。 材质动画 材质动画可以实现各种复杂的视觉效果,包括但不限于以下…...

利用ESP32-C3将TF卡内容变成U盘进行读取
利用ESP32-C3将TF卡内容变成U盘进行读取 ESP32-C3是一款高性价比的微控制器,具备WiFi和蓝牙功能,广泛应用于物联网(IoT)项目中。除了常见的无线通信功能外,ESP32-C3还可以用来模拟U盘读取TF卡内容。本文将介绍如何通过…...

C++小病毒
C小病毒(注:对电脑无过大伤害) 短短行,创造奇迹! 把这个文件命名为virus.exe就可以使用了。 #include<bits/stdc.h> #include<windows.h> using namespace std; int main() {HWND hwnd GetForegroundW…...

使用VUE3+TS+elementplus创建一个增加按钮
一、前言 在上一篇文章中分享了创建table的过程,详见(VUE3TSelementplus创建table,纯前端的table),本文在创建好的table的基础上,再创建一个增加按钮。 二、程序展示 1、前面创建table的程序 <templ…...

Python面试宝典:文件读写和上下文管理器以及输入输出流面试题(1000加python面试题助你轻松捕获大厂Offer)
Python面试宝典:1000加python面试题助你轻松捕获大厂Offer【第一部分:Python基础:第八章:文件操作和输入输出:第一节:文件读写和上下文管理器以及输入输出流】 第八章:文件操作和输入输出第一节:文件读写和上下文管理器以及输入输出流1.1、文件读写基本操作1.1.1、打开…...

Spring Boot | Spring Boot 实现 “记住我“ 功能
目录: 一、SpringBoot 中 自定义 "用户授权管理" ( 总体内容介绍 )二、实现 "记住我" 功能 ( 通过 "HttpSecurity类" 的 rememberMe( )方法来实现 "记住我" 功能 ) :2.1 基于 "简单加密 Token" 的方式 ( 实现 "记住我&…...

AGM AG32 MCU替代GD32F407/STM32F407的手持示波器方案
一、AG32 MCU替代GD32F407/STM32F407的数字示波器方案 市场上有多款数字迷你示波器,特别小巧,携带非常方便。单通道,标注100Mhz带宽,500MS采样率。 这样高的采样率需要使用高速AD及很高性能的处理器。原方案使用AD9288双通道8bit …...

MongoDB基础入门到深入(八)MongoDB整合SpringBoot、Chang Streams
文章目录 系列文章索引十五、MongoDB整合SpringBoot1、环境准备2、集合操作3、文档操作(1)相关注解(2)创建实体(3)添加文档(4)查询文档(5)更新文档࿰…...

linux下的nfs概述与实验(openEuler22.03)
目录 什么是NFSNFS工作原理NFS常用文件/etc/exports文件/etc/exports格式 /var/lib/nfs/etab文件 NFS常用选项NFS配置方式1. 安装NFS和RPC2. 启动rpcbind和nfs3. 准备目录4. 配置/etc/exports并重启服务5. 连接测试服务端自我测试客户端测试 补充常用命令永久挂载注意 什么是NF…...

影响Oracle数据库打开速度的因素
当Oracle发起一个事务需要更改数据时,如果所涉及的数据块不在BUFFER CACHE中,那么Oracle服务进程首先会将相关数据块从数据文件中读进BUFFER CACHE进行更改(直接路径读除外),更改后的数据块称之为脏块(DIRT…...

dify:开源 LLMOps平台。
单纯笔记: 一、关于 Dify dify/README_CN.md at main langgenius/dify GitHub Dify 是一款开源的大语言模型(LLM)应用开发平台。它融合了后端即服务(Backend as Service)和 LLMOps 的理念,使开发者可以…...

CCF20220601——归一化处理
CCF20220601——归一化处理 代码如下: #include<bits/stdc.h> using namespace std; int main() {int n,a[1000],sum0;scanf("%d",&n);for(int i1;i<n;i){scanf("%d",&a[i]);suma[i];}double aver1.0,b0.0,d1.0;aversum/(n*1…...

用Ai编写一个电机驱动程序
问:帮我写一个步进电机的控制程序,要有包括加减速,以及电机步中断处理函数。 答:编写一个步进电机的控制程序涉及到硬件层面的操作,通常需要依赖特定的硬件平台和相应的驱动程序。以下是一个简化的示例,它展…...

【C++入门】—— C++入门 (下)_内联函数
前言:在了解完前面的C基础内容后,马上我们就要真正不如C的学习了,但在之前让我们最后了解最后一点点C入门知识!来迟的520特别篇! 本篇主要内容: 内联函数 auto关键字 范围for 指针空值nullptr C入门 1. 内联…...

Java数据结构与算法(最小栈)
前言 设计一个支持 push ,pop ,top 操作,并能在常数时间内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void pop() 删除堆栈顶部的元素。int top() 获取堆栈顶部的元素。i…...

7 Series FPGAs Integrated Block for PCI Express IP核 Advanced模式配置详解(三)
1 TL Settings Transaction Layer (TL)设置只在Advanced模式下有效。 Endpoint: Unlock and PME_Turn_Off Messages: 与端点的电源管理相关,允许发送解锁和电源管理事件关闭消息。 Root Port: Error Messages: Error Correctable(错误可纠正)…...

k8s 部署mqtt简介
在Kubernetes(K8s)中部署MQTT(Message Queuing Telemetry Transport)服务通常涉及以下几个步骤: 选择MQTT Broker MQTT Broker是MQTT消息传递的中间件。流行的MQTT Broker包括Mosquitto, HiveMQ, EMQ X等。你需要选择一…...

汇凯金业:量化交易中常用的数学模型有哪些
量化交易中运用了多种数学模型来识别市场的潜在机会和建立交易策略。以下是一些在量化交易中常用的数学模型: 1. 时间序列分析模型 时间序列分析是研究和预测数据点随时间顺序变化趋势的方法。在量化交易中,常用的时间序列模型包括: 自回归&a…...

局部直方图均衡化去雾算法
目录 1. 引言 2. 算法流程 3. 代码 4. 去雾效果 1. 引言 局部直方图算法是一种基于块的图像去雾方法,它将图像分割为若干个块,并在每个块内计算块的局部直方图。通过对各个块的直方图进行分析和处理,该算法能够更好地适应图像中不同区域的…...

selenium环境安装和web自动化基础
webUI自动化背景 因为web页面经常会变化,所以UI自动化测试的维护成本很高。不如接口的适用面广,所以大部分公司会做接口自动化测试,但是未必会做UI自动化测试; UI自动化测试要做也是覆盖冒烟测试,不会到很高的覆盖率&a…...