【Linux进程篇】进程终章:POSIX信号量线程池线程安全的单例模式自旋锁读者写者问题
W...Y的主页 😊
代码仓库分享 💕
前言:在之前的进程间通信时我们就讲到过信号量,他的本质就是一个计数器,用来描述临界资源的一个计数器。我们当时使用电影院的例子来说明信号量。电影院的座位被我们称为临界资源,只有买到票才能有座位去看电影,申请信号量就是预定买票,申请成功才可以继续往下走下去。而我们看完电影就会释放临界资源,这些资源就可以被别人申请了。
目录
POSIX信号量
基于环形队列的生产消费模型
线程池
STL,智能指针和线程安全
线程安全的单例模式
什么是单例模式
什么是设计模式
单例模式的特点
饿汉实现方式和懒汉实现方式
饿汉方式实现单例模式
懒汉方式实现单例模式
其他常见的各种锁
自旋锁
读者写者问题
读写锁接口
POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
上述就是我信号量的基本操作,现在我们需要实现一个基于环形队列一个生产消费者模型,我们为什么不用上篇博客中的阻塞队列呢。因为阻塞队列我们将其看作一个整体只能进行首尾操作,不能访问队列中间内容。而环形队列我们使用的是vecotor数组进行模拟,可以使用下标进行访问中间内容。
基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
当hend与tail指向同一块位置时,数组可能为满也可能为空,其余的时候可以进行并发访问数组。
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
ringqueue.hpp
#pragma once
#include<iostream>
#include<vector>
#include<semaphore.h>
#include<pthread.h>
using namespace std;
template<typename T>
class RingQueueu
{
private:void P(sem_t& sem){sem_wait(&sem);}void V(sem_t& sem){sem_post(&sem);}void Lock(pthread_mutex_t& mutex){pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t& mutex){pthread_mutex_unlock(&mutex);}
public:RingQueueu(int cap):_cap(cap),_ring_queue(cap){sem_init(&_room_sem,0,_cap);sem_init(&_data_sem,0,0);pthread_mutex_init(&_productor_mutex,nullptr);pthread_mutex_init(&_consumer_mutex,nullptr);}void Enqueue(const T& in){Lock(_productor_mutex);P(_room_sem);_ring_queue[_productor_step++] = in;_productor_step %= _cap;V(_data_sem);Unlock(_productor_mutex);}void Pop(T* out){Lock(_consumer_mutex);P(_data_sem);*out = _ring_queue[_consumer_step++];_consumer_step %= _cap;V(_room_sem);Unlock(_consumer_mutex);}~RingQueueu(){sem_destroy(&_room_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(&_productor_mutex);pthread_mutex_destroy(&_consumer_mutex);}
private:vector<T> _ring_queue;int _cap;int _productor_step = 0;int _consumer_step = 0;sem_t _room_sem;sem_t _data_sem;//加锁,维护多生产多消费pthread_mutex_t _productor_mutex;pthread_mutex_t _consumer_mutex;
};
单生产单消费时可以不用加锁,而多生产多消费时就需要加锁防止同时访问临界资源。
而加锁解锁应该放在申请信号量的后面进行才是比较好的,为什么呢?因为申请信号量是预定机制是原子的,不会出现线程安全问题, 这样可以先预定再等锁,锁来了之后直接运行后面代码,可以提高效率。(等也是等,不如再等的时候申请信号量)
所以这样写更好:
void Enqueue(const T &in){P(_room_sem);Lock(_productor_mutex);_ring_queue[_productor_step++] = in;_productor_step %= _cap;Unlock(_productor_mutex);V(_data_sem);}void Pop(T *out){P(_data_sem);Lock(_consumer_mutex);*out = _ring_queue[_consumer_step++];_consumer_step %= _cap;Unlock(_consumer_mutex);V(_room_sem);}
main.cc
#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include <ctime>
int a = 10;
using namespace ThreadModule;
using namespace std;
using ringqueue_t = RingQueueu<Task>;void PrintHello()
{cout << "hello" << endl;
}
void Consumer(ringqueue_t &rq)
{while (true){Task t;rq.Pop(&t);t();}
}
void Productor(ringqueue_t &rq)
{while (true){sleep(1);rq.Enqueue(Download);}
}
void InitComm(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq, func_t<ringqueue_t> func)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1);threads->emplace_back(func, rq, name);//(*threads)[threads->size()-1].Start();//threads->back().Start();}
}
void InitConsumer(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{InitComm(threads, num, rq, Consumer);
}void InitProductor(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{InitComm(threads, num, rq, Productor);
}
void WaitAllThread(std::vector<Thread<ringqueue_t>> threads)
{for(auto thread : threads){thread.Join();}
}
void StartAll(vector<Thread<ringqueue_t>>& threads)
{for(auto& thread : threads){thread.Start();}
}
int main()
{ringqueue_t *bq = new ringqueue_t(10);std::vector<Thread<ringqueue_t>> threads;InitConsumer(&threads, 1, *bq);InitProductor(&threads, 1, *bq);StartAll(threads);WaitAllThread(threads);return 0;
}
这里使用的也不是原生线程库,而是我们自己封装的thread。上述代码都是完整无误的,但是这里我们要说一个非常隐蔽的问题。我们先将Thread对象放入vector中,再使用StartAll()函数遍历了vector创建了线程,但是为什么我们要分开写,不直接使用threads->back().Start();这个写法呢?
因为线程的创建是跳转到Start函数中去,但是主线程还会继续循环进行,可能下一个Thread即将放入vector中导致vector最后一个内容发生变化,最终某个线程创建失败。这就是一个非常隐讳的并发问题。
线程池
线程池:
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
线程池:
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "Thread.hpp"
#include <vector>
#include"Log.hpp"
using namespace std;
using namespace ThreadModule;const static int defaultthreadnum = 3;
template <typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnlockQueue(){pthread_mutex_unlock(&_mutex);}void ThreadSleep(){pthread_cond_wait(&_cond, &_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadWakeAll(){pthread_cond_broadcast(&_cond);}
public:ThreadPool(int threadnum = defaultthreadnum): _threadnum(threadnum){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);LOG(INFO, "ThreadPool Construct()");}void InitThreadPool(){for (int num = 0; num < _threadnum; num++){string name = "thread-" + to_string(num + 1);// _threads.emplace_back(Print, name);_threads.emplace_back(bind(&ThreadPool::HandlerTask, this, placeholders::_1), name);LOG(INFO, "init thread %s done", name.c_str());}_isrunning = true;}void HandlerTask(string name){LOG(INFO, " %s is running ...", name.c_str());while (true){LockQueue();while (_task_queue.empty() && _isrunning){waitnum++;ThreadSleep();waitnum--;}if(_task_queue.empty() && !_isrunning){UnlockQueue();break;}T t = _task_queue.front();_task_queue.pop();UnlockQueue();LOG(DEBUG, "%s get a task", name.c_str());t();LOG(DEBUG, "%s hander a task result: %s", name.c_str(), t.ResultToString().c_str());}}bool Enqueue(const T &t){bool ret = false;LockQueue();if (_isrunning){_task_queue.push(t);if (waitnum > 0){ThreadWakeup();}LOG(DEBUG, "enqueue task success");ret = true;}UnlockQueue();return ret;}void Stop(){LockQueue();_isrunning = false;ThreadWakeAll();UnlockQueue();}void Start(){for (auto &thread : _threads){thread.Start();}}void Wait(){for (auto &thread : _threads){thread.Join();LOG(INFO, "%s is quit", thread.name().c_str());}}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _threadnum;vector<Thread> _threads;queue<T> _task_queue;pthread_mutex_t _mutex;pthread_cond_t _cond;int waitnum = 0;bool _isrunning = false;
};
我们创建一个日志Log.hpp
#pragma once#include <iostream>
#include <fstream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "LockGuard.hpp"bool gIsSave = false;
const std::string logname = "log.txt";// 1. 日志是由等级的
enum Level
{DEBUG = 0,INFO,WARNING,ERROR,FATAL
};void SaveFile(const std::string &filename, const std::string &message)
{std::ofstream out(filename, std::ios::app);if (!out.is_open()){return;}out << message;out.close();
}std::string LevelToString(int level)
{switch (level){case DEBUG:return "Debug";case INFO:return "Info";case WARNING:return "Warning";case ERROR:return "Error";case FATAL:return "Fatal";default:return "Unknown";}
}std::string GetTimeString()
{time_t curr_time = time(nullptr);struct tm *format_time = localtime(&curr_time);if (format_time == nullptr)return "None";char time_buffer[1024];snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",format_time->tm_year + 1900,format_time->tm_mon + 1,format_time->tm_mday,format_time->tm_hour,format_time->tm_min,format_time->tm_sec);return time_buffer;
}pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 2. 日志是由格式的
// 日志等级 时间 代码所在的文件名/行数 日志的内容
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{std::string levelstr = LevelToString(level);std::string timestr = GetTimeString();pid_t selfid = getpid();char buffer[1024];va_list arg;va_start(arg, format);vsnprintf(buffer, sizeof(buffer), format, arg);va_end(arg);std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +"[" + std::to_string(selfid) + "]" +"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";LockGuard lockguard(&lock);// pthread_mutex_lock(&lock);if (!issave){std::cout << message;}else{SaveFile(logname, message);}// pthread_mutex_lock(&lock); // bug??// std::cout << levelstr << " : " << timestr << " : " << filename << " : " << line << ":" << buffer << std::endl;
}// C99新特性__VA_ARGS__
#define LOG(level, format, ...) \do \{ \LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__); \} while (0)#define EnableFile() \do \{ \gIsSave = true; \} while (0)
#define EnableScreen() \do \{ \gIsSave = false; \} while (0)
其中日志中的锁是我们使用RALL思想管理的锁。
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__#include <iostream>
#include <pthread.h>class LockGuard
{
public:LockGuard(pthread_mutex_t *mutex):_mutex(mutex){pthread_mutex_lock(_mutex); // 构造加锁}~LockGuard(){pthread_mutex_unlock(_mutex);}
private:pthread_mutex_t *_mutex;
};#endif
同样我们使用的Thread是分装后的接口。
Main.cc
#include "ThreadPool.hpp"
#include <iostream>
#include <vector>
#include "Task.hpp"
#include <string>
#include "Log.hpp"
#include <unistd.h>
#include <memory>
#include <ctime>
using namespace std;int main()
{srand(time(nullptr) ^ getpid() ^ pthread_self());EnableScreen();// EnableFile();unique_ptr<ThreadPool<Task>> tp = make_unique<ThreadPool<Task>>(5);tp->InitThreadPool();tp->Start();int tasknum = 10;while (tasknum){int a = rand() % 10 + 1; usleep(1234);int b = rand() % 5 + 1;Task t(a, b);LOG(INFO, "main thread push task: %s", t.DebugToString().c_str());tp->Enqueue(t);sleep(1);tasknum--;}tp->Stop();tp->Wait();// LogMessage(DEBUG, "helloworld");return 0;
}
以上是线程池的部分代码,线程池的思想与生产消费者模型相似,需要用vector来存放线程,使用queue来存放任务。使用智能指针来控制线程池。
STL,智能指针和线程安全
STL中的容器是否是线程安全的?
不是.
原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.
而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).
因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.
智能指针是否是线程安全的?
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.
线程安全的单例模式
什么是单例模式
单例模式是一种 "经典的, 常用的, 常考的" 设计模式.
什么是设计模式
IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式
单例模式的特点
某些类, 只应该具有一个对象(实例), 就称之为单例.
例如一个男人只能有一个媳妇.
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据.
饿汉实现方式和懒汉实现方式
[洗完的例子]
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度.
饿汉方式实现单例模式
template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};
只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例.
懒汉方式实现单例模式
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};
存在一个严重的问题, 线程不安全.
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
但是后续再次调用, 就没有问题了.
这两种方法在结果是相同的,但是在最开始使用时,饿汉模式是空间换时间,懒汉模式是时间换空间。
我们对上述的线程池代码进行修改让其也可以实现懒汉模式下的单例模式。
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "Thread.hpp"
#include <vector>
#include "Log.hpp"
using namespace std;
using namespace ThreadModule;const static int defaultthreadnum = 3;
template <typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnlockQueue(){pthread_mutex_unlock(&_mutex);}void ThreadSleep(){pthread_cond_wait(&_cond, &_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadWakeAll(){pthread_cond_broadcast(&_cond);}ThreadPool(int threadnum = defaultthreadnum) : _threadnum(threadnum), _waitnum(0), _isrunning(false){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);LOG(INFO, "ThreadPool Construct()");}void InitThreadPool(){// 指向构建出所有的线程,并不启动for (int num = 0; num < _threadnum; num++){std::string name = "thread-" + std::to_string(num + 1);_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);LOG(INFO, "init thread %s done", name.c_str());}_isrunning = true;}void Start(){for (auto &thread : _threads){thread.Start();}}void HandlerTask(std::string name) // 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用!{LOG(INFO, "%s is running...", name.c_str());while (true){// 1. 保证队列安全LockQueue();// 2. 队列中不一定有数据while (_task_queue.empty() && _isrunning){_waitnum++;ThreadSleep();_waitnum--;}// 2.1 如果线程池已经退出了 && 任务队列是空的if (_task_queue.empty() && !_isrunning){UnlockQueue();break;}// 2.2 如果线程池不退出 && 任务队列不是空的// 2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后在退出// 3. 一定有任务, 处理任务T t = _task_queue.front();_task_queue.pop();UnlockQueue();LOG(DEBUG, "%s get a task", name.c_str());// 4. 处理任务,这个任务属于线程独占的任务t();LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());}}// 复制拷贝禁用ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;ThreadPool(const ThreadPool<T> &) = delete;public:static ThreadPool<T> *GetInstance(){// 如果是多线程获取线程池对象下面的代码就有问题了!!// 只有第一次会创建对象,后续都是获取// 双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全if (nullptr == _instance) // 保证第二次之后,所有线程,不用在加锁,直接返回_instance单例对象{LockGuard lockguard(&_lock);if (nullptr == _instance){_instance = new ThreadPool<T>();_instance->InitThreadPool();_instance->Start();LOG(DEBUG, "创建线程池单例");return _instance;}}LOG(DEBUG, "获取线程池单例");return _instance;}bool Enqueue(const T &t){bool ret = false;LockQueue();if (_isrunning){_task_queue.push(t);if (_waitnum > 0){ThreadWakeup();}LOG(DEBUG, "enqueue task success");ret = true;}UnlockQueue();return ret;}void Stop(){LockQueue();_isrunning = false;ThreadWakeAll();UnlockQueue();}void Wait(){for (auto &thread : _threads){thread.Join();LOG(INFO, "%s is quit", thread.name().c_str());}}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _threadnum;vector<Thread> _threads;queue<T> _task_queue;pthread_mutex_t _mutex;pthread_cond_t _cond;int _waitnum;bool _isrunning = false;static ThreadPool<T> *_instance;static pthread_mutex_t _lock;
};template <typename T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;template <typename T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;
我们一定要注意懒汉模式的线程安全问题,因为单例是在不创建对象的前提通过调用函数来实现的,函数是不可重入的所以在多线程并发访问时可能出现执行多次函数导致创建多个单例而违背单例初衷。所以我们要在函数中加锁来保护。
其他常见的各种锁
悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
自旋锁:自旋锁(Spinlock)是一种用于多线程同步的锁机制,它与互斥锁(mutex)在某些方面相似,但有一个关键的区别:当一个线程尝试获取一个已经被其他线程持有的自旋锁时,该线程不会立即阻塞(即不会进入睡眠状态),而是在当前位置“自旋”,也就是循环等待,直到锁被释放。
自旋锁
我们从自旋锁的定义就可以看出自旋锁与互斥锁mutex唯一区别是线程是否需要阻塞等待,而这就取决于申请到锁的线程在临界区执行时长的问题。如果时间比较就我们就要使用mutex挂起等待,反之可以使用自旋锁spinlock一直去申请访问。
自旋锁的接口与互斥锁大相径庭,我们来学习一下:
使用时我们只需要定义一个pthread_spinlock_t对象即可。
无论是自旋锁还是互斥锁都需要我们程序员去判断然后使用,正确使用可以提高效率。反之无条件使用自旋锁可能会将CPU打满死机!!!
读者写者问题
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
什么是读者写者问题,这个与生产消费者模型有极大相似性。举个例子,我们写CSDN文章,出黑板报等待都是读者写者问题。写者将内容发送到一个临界区,读者只需要读即可。所以本质也是321原则。一个交易场所、两个角色:读者、写者,三种关系:读者VS读者、写者VS写者、读者VS写者。
其中读者与写者一定有互斥和同步的关系,写者与写者之间有互斥的关系。但是读者与读者之间却没有关系,这与生产消费者模型就有不同之处。因为消费者是需要将临界区的数据拿走,而读者只需要拷贝数据,不会讲数据拿走,这就导致读者之间没有关系。
这种关系自己使用加锁解锁逻辑是没问题的,下面有一段伪代码就是整体思路:
int reader_count = 0; //读者计数器
pthread_mutex_t wlock; //写者锁
pthread_mutex_t rlock; //读者锁//读者
lock(&rlock);
if(reader_count == 0)lock(&wlock);
++reader_count;
unlock(&rlock);//读者读操作lock(&rlock);
--reader_count;
if(reader_count == 0)unlock(&wlock);
unlock(&rlock);//写者lock(&wlock);//写者写操作unlock(&wlock);
写者思路非常简单,只需要维护只有一个写者进入写即可。因为读者计数器也属于临界资源我们要加锁保护。当我们判断读者为0时证明是第一个读者进入,所以我们要申请写者锁防止写者进入,如果我们申请不到锁证明写者持有锁读者进入不了,这就做到了互斥与同步。后面当最后一个--后计数器为0证明是最后一个走的读者,里面没有读者了就可以释放写者锁了。
通过上述伪代码我们可以看出,其读者写者问题都是围绕读者优先实现的,所以当读者基数过大时肯定会导致写者饥饿问题。所以会出现读者优先(上述伪代码逻辑)与写者优先,写者优先的思路就是当写者到来时,我们会阻挡还未进入的读者,当已进入的读者全部出来时写者先进入!!!但是实现就有点复杂,这里我们不给予实现。
为了不这么使用,pthread库提供了读写锁,而上述问题就会在库中得到解决!
读写锁接口
设置读写优先
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
*/
初始化
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);
销毁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);//读加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);//写加锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);//读写解锁都可以使用
下面代码是一段读写锁模型实例,可以参考怎么使用:
#include <vector>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
volatile int ticket = 1000;
pthread_rwlock_t rwlock;
void * reader(void * arg)
{
char *id = (char *)arg;
while (1) {
pthread_rwlock_rdlock(&rwlock);
if (ticket <= 0) {
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
void * writer(void * arg)
{
char *id = (char *)arg;
while (1) {
pthread_rwlock_wrlock(&rwlock);
if (ticket <= 0) {
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, --ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
struct ThreadAttr
{
pthread_t tid;
std::string id;
};
std::string create_reader_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread reader ", std::ios_base::ate);
oss << i;
return oss.str();
}
std::string create_writer_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread writer ", std::ios_base::ate);
oss << i;
return oss.str();
}
void init_readers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i) {
vec[i].id = create_reader_id(i);
pthread_create(&vec[i].tid, nullptr, reader, (void *)vec[i].id.c_str());
}
}
void init_writers(std::vector<ThreadAttr>& vec)
{
for (std::size_t i = 0; i < vec.size(); ++i) {
vec[i].id = create_writer_id(i);
pthread_create(&vec[i].tid, nullptr, writer, (void *)vec[i].id.c_str());
}
}
void join_threads(std::vector<ThreadAttr> const& vec)
{
// 我们按创建的 逆序 来进行线程的回收
for (std::vector<ThreadAttr>::const_reverse_iterator it = vec.rbegin(); it !=
vec.rend(); ++it) {
pthread_t const& tid = it->tid;
pthread_join(tid, nullptr);
}
}
void init_rwlock()
{
#if 0 // 写优先
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlock_init(&rwlock, &attr);
pthread_rwlockattr_destroy(&attr);
#else // 读优先,会造成写饥饿
pthread_rwlock_init(&rwlock, nullptr);
#endif
}
int main()
{
// 测试效果不明显的情况下,可以加大 reader_nr
// 但也不能太大,超过一定阈值后系统就调度不了主线程了
const std::size_t reader_nr = 1000;
const std::size_t writer_nr = 2;
std::vector<ThreadAttr> readers(reader_nr);
std::vector<ThreadAttr> writers(writer_nr);
init_rwlock();
init_readers(readers);
init_writers(writers);
join_threads(writers);
join_threads(readers);
pthread_rwlock_destroy(&rwlock);
}
以上就是本次全部内容,感谢大家观看。
相关文章:

【Linux进程篇】进程终章:POSIX信号量线程池线程安全的单例模式自旋锁读者写者问题
W...Y的主页 😊 代码仓库分享 💕 前言:在之前的进程间通信时我们就讲到过信号量,他的本质就是一个计数器,用来描述临界资源的一个计数器。我们当时使用电影院的例子来说明信号量。电影院的座位被我们称为临界资源&a…...

MathType7.5破解版下载安装激活图文详细教程(附激活秘钥)
🌟 引言:揭秘MathType,数学编辑的瑞士军刀! 嘿,各位小伙伴,今天我要给你们安利一个我超级喜欢的数学神器——MathType!如果你跟我一样,在处理数学公式时常常感到头疼,那你…...

2-62 基于MATLAB gui 编制短波通信系统
基于MATLAB gui 编制短波通信系统,录制一段语音信号,分别通过AM SSB DSB 等调制信号,加入噪声,然后解调出来,可比较各种调制解调方式的优劣。程序已调通,可直接运行。 2-62 matlab gui - 小红书 (xiaohongs…...
windows C++-C++/WinRT 中创建组件和事件(下)
跨 ABI 的简单信号 如果无需连同事件传递任何形参或实参,则可以定义自己的简单 Windows 运行时委托类型。 以下示例展示 Thermometer 运行时类的更简易版本。 它声明名为 SignalDelegate 的委托类型,然后使用该类型来引发信号类型事件,而不是…...

C++初学者指南-5.标准库(第二部分)--二叉堆操作
C初学者指南-5.标准库(第二部分)–二叉堆操作 文章目录 C初学者指南-5.标准库(第二部分)--二叉堆操作背景什么是“堆”二叉最大堆二叉树的表示 堆操作C标准库中的堆初始化堆收缩堆增长堆 辅助操作sort_heap (Heap → Sorted Array)is_heapis_heap_until 相关内容 不熟悉 C 的标…...
在Ubuntu 16.04上安装Git的方法
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 简介 在现代软件开发中,一个不可或缺的工具是某种版本控制系统。版本控制系统允许您在源代码级别跟踪软件。您可以跟踪更改…...

redis内存淘汰策略-------Reservoir Sampling(水库采样)
文章目录 过期删除策略和内存淘汰策略内存淘汰策略evictionPoolEntryevictionPoolPopulate Reservoir SamplingdictGetRandomKeydictGetSomeKeysReservoir Samplingchatgpt对Reservoir Sampling的介绍 过期删除策略和内存淘汰策略 详细介绍请参考博客“redis过期删除策略和内存…...

C++《类和对象》(上)
在之前的C入门基础知识中我们了解了C的发展过程已经重要性,还初步了解了C中一些相比C语言特有的知识点,例如命名空间、缺少参数、函数重载、引用等,接下来在本篇中我们将开始C整个体系中非常重要的一个知识章节——类和对象,类和对…...

LLM大语言模型算法特训
百度 LLM(Large Language Model)大语言模型算法特训是一个深度学习领域的高级培训项目,专门设计用于训练和优化大规模语言模型的开发者和研究人员。本文将详细探讨LLM算法的基本原理、训练技术、应用领域以及参与者可以预期的学习收获和挑战。…...
Docker相关笔记
Docker笔记 1. Dockerfile编译构建docker Dockerfile 是一个文本文件,包含了构建 Docker 镜像的所有指令。 Dockerfile 常用的有如下关键字: FROM:指定基础镜像,后续定制操作都是基于这个基础镜像,比如: …...

前端技术day01-HTML入门
一、前端介绍 技术描述HTML用于构建网站的基础结构的CSS用于美化页面的,作用和化妆或者整容作用一样JS实现网页和用户的交互Vue主要用于将数据填充到html页面上的Element主要提供了一些非常美观的组件 二、工具软件 VsCode 在前端领域,有一个公认好用…...

Multisim 用LM358 运放模拟线性稳压器 - 运放输出饱和 - 前馈电容
就是拿运放搭一个可调的LDO 稳压器,类似下面这个功能框图里的感觉。本来应该非常简单,没什么好说的,没想到遇到了两个问题。 原理 - 理想运放 我用PNP 三极管Q2 作为输出,运放输出电压升高时,流过PNP 三极管BE 的电流变…...

宁德大屏第二版总结
碰到难点 1.wss 心跳机制 实现前端和后端双向绑定 只要后端发送了消息 前端通过全局总线去触发你想要的函数。 全局总线 vue3可以全局总线下一个mitt 新建一个eventBus.js import mitt from "mitt"; const eventBus mitt();export default eventBus; 然后wss…...
冥想第一千二百四十七天(1247)
1.今天上午带桐桐去游泳了,买了卡吉诺,吃过最好吃的甜点。推荐。还有鸡排。 2.回来后带着媳妇,先加油。去给丈母娘看腿,等丈母娘等了好久,还帮她推车。 3.回来后,在丈母娘家跑步。很舒服。家长麦田的香味。…...

基于光学动捕定位下的Unity-VR手柄交互
Unity VR 场景手柄交互实现方案 需求 在已创建好的 Unity VR 场景中,接入游戏手柄,通过结合动捕系统与 VRPN,建立刚体,实时系统获取到手柄的定位数据与按键数据,通过编写代码实现手柄的交互逻辑,实现手柄…...
php json_decode 带反斜杠字符串json解析
PHP json_decode 带反斜杠字符串json解析 今天再次遇到了json字符串中包含反斜杠的问题,记录下解决方法 在JSON字符串中,反斜杠\用作转义字符。当JSON_UNESCAPED_SLASHES选项被用于json_encode()函数时,不会在slashes前面添加反斜杠。 但是…...

【NLP】文本张量表示方法【word2vec、词嵌入】
文章目录 1、文本张量表示2、one-hot词向量表示2.1、one-hot编码代码实现:2.2、onehot编码器的使用2.3、one-hot编码的优劣势 3、word2vec模型3.1、模型介绍3.2、CBOW模式3.3、skipgram模式3.4、word2vec的训练和使用3.4.1、获取训练数据3.4.2、训练词向量3.4.3、查…...

疯狂Java讲义_08_泛型
文章目录 泛型的传参若函数里的参数使用基类接受所有的派生类,怎么做? 类型通配符的上限类型通配符的下限 泛型的传参 注意 若类 Base 是类 Derived 的基类(父类),那么数组类型 Base[] 是 Derived[] 的基类࿰…...

HCIA、OSPF笔记
一、OSI参考模型 1、OSI的结构 应用层:把人类语言转化成编码,为各种应用程序提供网络服务。 表示层:定义一些数据的格式,(对数据进行加密、解密、编码、解码、压缩、解压缩,每一层都可以实现,…...
Python删除lru_cache缓存
在 Python 中,lru_cache 是一个装饰器,用于添加缓存功能以提高函数的性能。如果你想清除或者删除 lru_cache 中的缓存,有几种方法可以做到: 手动清除缓存: lru_cache 对象有一个方法叫做 cache_clear(),可以手动清除所有缓存。示例:@lru_cache(maxsize=128) def some_fun…...

【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...

Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
可靠性+灵活性:电力载波技术在楼宇自控中的核心价值
可靠性灵活性:电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中,电力载波技术(PLC)凭借其独特的优势,正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据,无需额外布…...

前端导出带有合并单元格的列表
// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...

Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

uniapp手机号一键登录保姆级教程(包含前端和后端)
目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号(第三种)后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...
Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成
一个面向 Java 开发者的 Sring-Ai 示例工程项目,该项目是一个 Spring AI 快速入门的样例工程项目,旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计,每个模块都专注于特定的功能领域,便于学习和…...
Windows 下端口占用排查与释放全攻略
Windows 下端口占用排查与释放全攻略 在开发和运维过程中,经常会遇到端口被占用的问题(如 8080、3306 等常用端口)。本文将详细介绍如何通过命令行和图形化界面快速定位并释放被占用的端口,帮助你高效解决此类问题。 一、准…...