【Linux取经路】基于信号量和环形队列的生产消费者模型
文章目录
- 一、POSIX 信号量
- 二、POSIX 信号量的接口
- 2.1 sem_init——初始化信号量
- 2.2 sem_destroy——销毁信号量
- 2.3 sem_wait——等待信号量
- 2.4 sem_post——发布信号量
- 三、基于环形队列的生产消费者模型
- 3.1 单生产单消费模型
- 3.2 多生产多消费模型
- 3.3 基于任务的多生产多消费模型
- 四、结语

一、POSIX 信号量
共享资源也可以被看成多份,只要规定好每个线程的访问区域即可,此时就可以让多线程去并发的访问临界资源。
POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的。但 POSIX 可以用于线程间同步。信号量本质是一把计数器,用来描述可用资源数目的,申请信号量时,其实就已经在间接的做判断,看资源是否就绪了,只要申请到信号量,那么说明资源一定是就绪的。
信号量只能保证,不让多余的线程来访问共享资源,即,当前共享资源有十份,信号量不会允许同时有十一个线程来访问临界资源。但是具体的资源分配是通过程序员编码去实现的。如果出现一个共享资源同时被两个线程访问,就属于程序员的编码 Bug。
二、POSIX 信号量的接口
2.1 sem_init——初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
-
sem:要初始化的信号量 -
pshared:0表示线程间共享,非0表示进程间共享。 -
value:信号量初始值
2.2 sem_destroy——销毁信号量
int sem_destroy(sem_t *sem);
2.3 sem_wait——等待信号量
int sem_wait(sem_t *sem); //P()
- 功能:会将信号量的值减1
2.4 sem_post——发布信号量
int sem_post(sem_t *sem);//V()
- 功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量的值加1
三、基于环形队列的生产消费者模型
只要生产和消费不访问同一个格子,那么生产和消费就可以同时进行。那生产和消费什么时候会指向同一个数据呢?答案是队列为空和为满的时候。


基于环形队列的生产消费者模型必须遵守以下三个原则:
-
当生产和消费指向同一个资源的时候,只能一个人访问。为空的时候,由生产者去访问;为满的时候,由消费者去访问
-
消费者不能超过生产者
-
生产者不能把消费者套圈,因为这样会导致数据被覆盖
生产者最关心还剩多少空间(空间数量);消费者最关系还剩多少数据(数据数量)。因为有两种资源,所以需要定义两个信号量。
3.1 单生产单消费模型
// RingQueue.hpp
#pragma once#include <pthread.h>
#include <vector>
#include <semaphore.h>template<class T>
class RingQueue
{
private:static const int defaultcap = 5;void P(sem_t *sem) // 申请一个信号量{sem_wait(sem); }void V(sem_t *sem) // 归还一个信号量{sem_post(sem);}
public:RingQueue(int cap = defaultcap):ringqueue_(cap), cap_(cap), c_step(0), p_step(0){sem_init(&cdata_sem, 0, 0);sem_init(&pspace_sem, 0, cap_);}void Push(const T &data) // 生产行为{P(&pspace_sem);ringqueue_[p_step] = data;V(&cdata_sem);p_step++;p_step %= cap_;}void Pop(T *out) // 消费行为{P(&cdata_sem);*out = ringqueue_[c_step];V(&pspace_sem);c_step++;c_step %= cap_;}~RingQueue(){sem_destroy(&cdata_sem);sem_destroy(&pspace_sem);}
private:std::vector<T> ringqueue_; // 环形队列int cap_; // 容量int c_step; // 消费者下一个要消费的位置int p_step; // 生产者下一个要生产的位置sem_t cdata_sem; // 数据资源sem_t pspace_sem; // 空间资源
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>using namespace std;void *Consumer(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);while(true){int data = 0;rq->Pop(&data);cout << "Consumer is running... get a data: " << data << endl;// 模拟处理数据usleep(1000000);}
}void *Productor(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);while(true){// 获取数据usleep(10000); // 模拟获取数据int data = rand() % 10;rq->Push(data);cout << "Productor is running... produce a data: " << data << endl;}
}int main()
{srand((unsigned int)time(nullptr));pthread_t c, p;RingQueue<int> *rq = new RingQueue<int>();pthread_create(&c, nullptr, Consumer, rq);pthread_create(&p, nullptr, Productor, rq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

互斥与同步的体现:当生产下标和消费下标相同的时候,只允许一个来访问,这就是互斥性的体现。当队列为空的时候,让生产者去访问资源,当队列为满的时候,让消费者去访问资源,这就是在指向同一个位置时,让生产和消费具有一定的顺序性,这就是同步性的体现。当队列不为空或不为满的时候,生产下标和消费下标不同,此时两个线程并发执行,并没有体现出很强的互斥特性。
3.2 多生产多消费模型
此时需要对下标资源进行保护。因为生产下标和消费下标各自只有一份,不允许同时有多个生产线程去访问生产下标,消费线程也一样。因此需要通过加锁来实现生产线程之间的互斥和消费线程之间的互斥。
先加锁还是先申请信号量?答案是先申请信号量,以生产线程为例,这样可以让所有生产线程并发的去执行,什么意思呢?如果是先加锁再申请信号量的话,因为始终只有一个生产者线程能够申请到锁,所以也就只有一个生产者线程能去申请信号量,其他生产者线程只能干巴巴的等待锁被释放。这时申请锁和申请信号量的动作是串行的。而先申请信号量的话,可以保证虽然只有一个线程能够申请到锁,但是其他没有锁的线程也可以不用闲着,可以先去申请信号量,因为信号量的申请是原子的,因此也不需要加锁进行保护,只要能申请到信号量,就说明资源还有,此时那些申请到信号量的线程就可能等待锁被释放,拿到锁之后就可以去执行相应的代码了。
// RingQueue.hpp
#pragma once#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <string>template<class T>
class RingQueue
{
private:static const int defaultcap = 5;void P(sem_t *sem) // 申请一个信号量{sem_wait(sem); }void V(sem_t *sem) // 归还一个信号量{sem_post(sem);}void Lock(pthread_mutex_t *mutex){pthread_mutex_lock(mutex);}void Unlock(pthread_mutex_t *mutex){pthread_mutex_unlock(mutex);}
public:RingQueue(int cap = defaultcap):ringqueue_(cap), cap_(cap), c_step(0), p_step(0){sem_init(&cdata_sem, 0, 0);sem_init(&pspace_sem, 0, cap_);pthread_mutex_init(&c_mutex, nullptr);pthread_mutex_init(&p_mutex, nullptr);}void Push(const T &data) // 生产行为{P(&pspace_sem);Lock(&p_mutex);ringqueue_[p_step] = data;p_step++;p_step %= cap_;Unlock(&p_mutex);V(&cdata_sem);}void Pop(T *out) // 消费行为{P(&cdata_sem); // 信号量资源是不需要保护的,因为它的操作是原子的,临界区中的代码要尽可能的少,所以不需要把信号量的申请放在加锁之后Lock(&c_mutex);*out = ringqueue_[c_step];c_step++;c_step %= cap_;Unlock(&c_mutex);V(&pspace_sem);}~RingQueue(){sem_destroy(&cdata_sem);sem_destroy(&pspace_sem);pthread_mutex_destroy(&c_mutex);pthread_mutex_destroy(&p_mutex);}
private:std::vector<T> ringqueue_; // 环形队列int cap_; // 容量int c_step; // 消费者下一个要消费的位置int p_step; // 生产者下一个要生产的位置sem_t cdata_sem; // 数据资源sem_t pspace_sem; // 空间资源pthread_mutex_t c_mutex; // 对消费下标的保护pthread_mutex_t p_mutex; // 对生产下标的保护
};template <class T>
class Message
{
public:Message(std::string thread_name, RingQueue<T> *ringqueue):thread_name_(thread_name), ringqueue_(ringqueue){}std::string &get_thread_name(){return thread_name_;}RingQueue<T> *get_ringqueue(){return ringqueue_;}
private:std::string thread_name_;RingQueue<T> *ringqueue_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>using namespace std;void *Consumer(void *args)
{Message<int> *message = static_cast<Message<int> *>(args);RingQueue<int> *rq = message->get_ringqueue();string name = message->get_thread_name();while (true){int data = 0;rq->Pop(&data);printf("%s is running... get a data: %d\n", name.c_str(), data);// 模拟处理数据// usleep(1000000);}
}void *Productor(void *args)
{Message<int> *message = static_cast<Message<int> *>(args);RingQueue<int> *rq = message->get_ringqueue();string name = message->get_thread_name();while (true){// 获取数据// usleep(1000000); // 模拟获取数据int data = rand() % 10;rq->Push(data);printf("%s is running... produce a data: %d\n", name.c_str(), data);usleep(1000000);}
}int main()
{srand((unsigned int)time(nullptr));pthread_t c[3], p[5];RingQueue<int> *rq = new RingQueue<int>(); vector<Message<int>*> messages; for (int i = 0; i < 5; i++){Message<int> *message = new Message<int>("Produttor Thread "+to_string(i), rq);pthread_create(p + i, nullptr, Productor, message);messages.push_back(message);}for (int i = 0; i < 3; i++){Message<int> *message = new Message<int>("Consumer Thread "+to_string(i), rq);pthread_create(c + i, nullptr, Consumer, message);messages.push_back(message);}for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}for (auto message : messages){delete message;}delete rq;return 0;
}

3.3 基于任务的多生产多消费模型
RingQueue 的内容不变
// Task.h
#include <iostream>
#include <string>enum
{DIVERROR = 1,MODERROR,UNKNOWERRROR
};class Task
{
public:Task(int a = 0, int b = 0, char op = '+'):data1_(a), data2_(b), op_(op), result_(0), exitcode_(0){}void run(){switch(op_){case '+':result_ = data1_ + data2_;break;case '-':result_ = data1_ - data2_;break;case '*':result_ = data1_ * data2_;break;case '/':if(data2_ == 0) exitcode_ = DIVERROR;else result_ = data1_ / data2_;break;case '%':if(data2_ == 0) exitcode_ = MODERROR;else result_ = data1_ % data2_;break;default:exitcode_ = UNKNOWERRROR;break;}}std::string result_to_string(){std::string ret = std::to_string(data1_);ret += ' ';ret += op_;ret += ' ';ret += std::to_string(data2_);ret += ' ';ret += '=';ret += ' ';ret += std::to_string(result_);ret += "[exitcode: ";ret += std::to_string(exitcode_);ret += ']';return ret;}std::string get_task(){std::string ret = std::to_string(data1_);ret += ' ';ret += op_;ret += ' ';ret += std::to_string(data2_);ret += ' ';ret += '=';ret += ' ';ret += '?';return ret;}
private:int data1_;int data2_;char op_;int result_;int exitcode_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>
#include "Task.h"using namespace std;const std::string opers = "+-*/%";void *Consumer(void *args)
{Message<Task> *message = static_cast<Message<Task> *>(args);RingQueue<Task> *rq = message->get_ringqueue();string name = message->get_thread_name();while (true){// 获取任务// int data = 0;Task task;rq->Pop(&task);// 对任务做处理task.run();printf("%s is running... get a data: %s\n", name.c_str(), task.result_to_string().c_str());// 模拟处理数据// usleep(1000000);}
}void *Productor(void *args)
{Message<Task> *message = static_cast<Message<Task> *>(args);RingQueue<Task> *rq = message->get_ringqueue();string name = message->get_thread_name();int len = opers.size();while (true){// 获取数据// usleep(1000000); // 模拟获取数据// int data = rand() % 10;// 模拟获取数据int data1 = rand() % 10 + 1; // [1, 10]usleep(10);int data2 = rand() % 13; // [0, 13]usleep(10);char op = opers[rand() % len];Task task(data1, data2, op);// 生产数据rq->Push(task);// printf("%s is running... produce a data: %d\n", name.c_str(), data);printf("%s is running... produce a Task: %s\n", name.c_str(), task.get_task().c_str());usleep(1000000);}
}int main()
{srand((unsigned int)time(nullptr));pthread_t c[3], p[2];RingQueue<Task> *rq = new RingQueue<Task>(); vector<Message<Task>*> messages; for (int i = 0; i < 5; i++){Message<Task> *message = new Message<Task>("Produttor Thread "+to_string(i), rq);pthread_create(p + i, nullptr, Productor, message);messages.push_back(message);}for (int i = 0; i < 3; i++){Message<Task> *message = new Message<Task>("Consumer Thread "+to_string(i), rq);pthread_create(c + i, nullptr, Consumer, message);messages.push_back(message);}// 等待子线程for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}// 释放资源for (auto message : messages){delete message;}delete rq;return 0;
}

四、结语
今天的分享到这里就结束啦!如果觉得文章还不错的话,可以三连支持一下,春人的主页还有很多有趣的文章,欢迎小伙伴们前去点评,您的支持就是春人前进的动力!

相关文章:
【Linux取经路】基于信号量和环形队列的生产消费者模型
文章目录 一、POSIX 信号量二、POSIX 信号量的接口2.1 sem_init——初始化信号量2.2 sem_destroy——销毁信号量2.3 sem_wait——等待信号量2.4 sem_post——发布信号量 三、基于环形队列的生产消费者模型3.1 单生产单消费模型3.2 多生产多消费模型3.3 基于任务的多生产多消费模…...
计算机SCI期刊,中科院2区,收稿范围非常广泛!
一、期刊名称 Journal of Web Semantics 二、期刊简介概况 期刊类型:SCI 学科领域:计算机科学 影响因子:2.5 中科院分区:2区 出版方式:开放出版 版面费:$1600 三、期刊征稿范围 《网络语义学杂志》…...
JDK、JRE、编译指令和垃圾回收机制详解
JDK 全称 Java SE Development Kit (Java 开发工具包) JVM虚拟机:Java运行的地方 核心类库:Java提前编好的东西 开发工具: javac,java,jdb,jhat javac:Java编译器,用于将Java源代码编译成Java字节码文件(.class)。 java: java…...
【ARM 嵌入式 C 入门及渐进 6.2 -- ARMv8 C 内嵌汇编读系统寄存器的函数实现】
请阅读【嵌入式开发学习必备专栏】 文章目录 ARMv8 C 内嵌汇编读系统寄存器 ARMv8 C 内嵌汇编读系统寄存器 要在ARMv8架构中通过C代码和内嵌汇编来读取系统寄存器s3_0_c15_c5_5的值,并将其返回,可以按照以下方式实现system_read_reg函数: #…...
使用 LlamaParse 进行 PDF 解析并创建知识图谱
此 Python 笔记本提供了有关利用 LlamaParse 从 PDF 文档中提取信息并随后将提取的内容存储到 Neo4j 图形数据库中的综合指南。本教程在设计时考虑到了实用性,适合对文档处理、信息提取和图形数据库技术感兴趣的开发人员、数据科学家和技术爱好者。 该笔记本电脑的主…...
Oracle行迁移解析
行迁移(Row Migration)是Oracle数据库中的另一个现象,它与行链接类似,都是由于数据行大小的变化导致的存储问题,但其本质和影响有所不同。 触发条件:行迁移发生在当一个已存在的、原先能够完全存储在一个数…...
【k8s】 busybox镜像、挂载volume卷
1. 概述 busybox是一个包含了nslookup,ping,wget等网络处理命令的Pod容器(不含curl命令),它的体积非常小,适合做一些容器内的网络调试。 即创建一个docker ,进去执行 ping 命令等 2. 启动容器 2.1 会自动退出&…...
文本三剑客之 sed 编辑器
一.sed 概述 1.sed 介绍 sed是一种流编辑器,流编辑器会在编辑器处理数据之前基于预先提供的一组规则来编辑数据流。 sed编辑器可以根据命令来处理数据流中的数据,这些命令要么从命令行中输入,要么存储在一个 命令文本文件中。 2.sed 的工…...
【MySQL精通之路】SQL优化(1)-查询优化(3)-索引合并
主博客: 【MySQL精通之路】SQL优化(1)-CSDN博客 上一篇: 【MySQL精通之路】SQL优化(1)-查询优化(2)-范围查询优化-CSDN博客 下一篇: 目录 1.索引合并-交集访问算法 2.索引合并联合访问算法 3.索引合并-排序联合访问算法 4.影响索引合…...
Linux中安装配置并使用samba服务(Centos以及Ubuntu)
目录 前言1. 基本知识2. Centos3. Ubuntu3.1 物理服务器3.2 云服务器前言 在window与linux系统中配置一个共享文件夹,可以做很多时间,比如映射器或者像linux中定时存放文件等 1. 基本知识 在Ubuntu上安装和配置Samba服务可以让你的Ubuntu机器与Windows、macOS以及其他Linu…...
three.js能实现啥效果?看过来,这里都是它的菜(06)
这是第五期了,本期继续分享three.js可以实现的3D动画案例,有老铁反馈再发案例的时候,是否可以顺道分享一下three.js的知识点,好吧,安排。 材质动画 材质动画可以实现各种复杂的视觉效果,包括但不限于以下…...
利用ESP32-C3将TF卡内容变成U盘进行读取
利用ESP32-C3将TF卡内容变成U盘进行读取 ESP32-C3是一款高性价比的微控制器,具备WiFi和蓝牙功能,广泛应用于物联网(IoT)项目中。除了常见的无线通信功能外,ESP32-C3还可以用来模拟U盘读取TF卡内容。本文将介绍如何通过…...
C++小病毒
C小病毒(注:对电脑无过大伤害) 短短行,创造奇迹! 把这个文件命名为virus.exe就可以使用了。 #include<bits/stdc.h> #include<windows.h> using namespace std; int main() {HWND hwnd GetForegroundW…...
使用VUE3+TS+elementplus创建一个增加按钮
一、前言 在上一篇文章中分享了创建table的过程,详见(VUE3TSelementplus创建table,纯前端的table),本文在创建好的table的基础上,再创建一个增加按钮。 二、程序展示 1、前面创建table的程序 <templ…...
Python面试宝典:文件读写和上下文管理器以及输入输出流面试题(1000加python面试题助你轻松捕获大厂Offer)
Python面试宝典:1000加python面试题助你轻松捕获大厂Offer【第一部分:Python基础:第八章:文件操作和输入输出:第一节:文件读写和上下文管理器以及输入输出流】 第八章:文件操作和输入输出第一节:文件读写和上下文管理器以及输入输出流1.1、文件读写基本操作1.1.1、打开…...
Spring Boot | Spring Boot 实现 “记住我“ 功能
目录: 一、SpringBoot 中 自定义 "用户授权管理" ( 总体内容介绍 )二、实现 "记住我" 功能 ( 通过 "HttpSecurity类" 的 rememberMe( )方法来实现 "记住我" 功能 ) :2.1 基于 "简单加密 Token" 的方式 ( 实现 "记住我&…...
AGM AG32 MCU替代GD32F407/STM32F407的手持示波器方案
一、AG32 MCU替代GD32F407/STM32F407的数字示波器方案 市场上有多款数字迷你示波器,特别小巧,携带非常方便。单通道,标注100Mhz带宽,500MS采样率。 这样高的采样率需要使用高速AD及很高性能的处理器。原方案使用AD9288双通道8bit …...
MongoDB基础入门到深入(八)MongoDB整合SpringBoot、Chang Streams
文章目录 系列文章索引十五、MongoDB整合SpringBoot1、环境准备2、集合操作3、文档操作(1)相关注解(2)创建实体(3)添加文档(4)查询文档(5)更新文档࿰…...
linux下的nfs概述与实验(openEuler22.03)
目录 什么是NFSNFS工作原理NFS常用文件/etc/exports文件/etc/exports格式 /var/lib/nfs/etab文件 NFS常用选项NFS配置方式1. 安装NFS和RPC2. 启动rpcbind和nfs3. 准备目录4. 配置/etc/exports并重启服务5. 连接测试服务端自我测试客户端测试 补充常用命令永久挂载注意 什么是NF…...
影响Oracle数据库打开速度的因素
当Oracle发起一个事务需要更改数据时,如果所涉及的数据块不在BUFFER CACHE中,那么Oracle服务进程首先会将相关数据块从数据文件中读进BUFFER CACHE进行更改(直接路径读除外),更改后的数据块称之为脏块(DIRT…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南
🚀 C extern 关键字深度解析:跨文件编程的终极指南 📅 更新时间:2025年6月5日 🏷️ 标签:C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言🔥一、extern 是什么?&…...
MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 🍺 最新版brew安装慢到怀疑人生?别怕,教你轻松起飞! 最近Homebrew更新至最新版,每次执行 brew 命令时都会自动从官方地址 https://formulae.…...
libfmt: 现代C++的格式化工具库介绍与酷炫功能
libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库,提供了高效、安全的文本格式化功能,是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全:…...
规则与人性的天平——由高考迟到事件引发的思考
当那位身着校服的考生在考场关闭1分钟后狂奔而至,他涨红的脸上写满绝望。铁门内秒针划过的弧度,成为改变人生的残酷抛物线。家长声嘶力竭的哀求与考务人员机械的"这是规定",构成当代中国教育最尖锐的隐喻。 一、刚性规则的必要性 …...
MeshGPT 笔记
[2311.15475] MeshGPT: Generating Triangle Meshes with Decoder-Only Transformers https://library.scholarcy.com/try 真正意义上的AI生成三维模型MESHGPT来袭!_哔哩哔哩_bilibili GitHub - lucidrains/meshgpt-pytorch: Implementation of MeshGPT, SOTA Me…...
