当前位置: 首页 > news >正文

C++轻量级 线程间异步消息架构(向曾经工作的ROSA-RB以及共事的DOPRA的老兄弟们致敬)

1 啰嗦一番背景

这么多年,换着槽位做牛做马,没有什么钱途

手艺仍然很潮,唯有对于第一线的码农工作,孜孜不倦,其实没有啥进步,就是在不断地重复,刷熟练度,和同期的老兄弟们,那是千万不要有比较的想法,Q一把说各有各的活路。

业务线,技术线,管理线,通通都无成就。

唯有在一亩三分地上,辛勤耕耘,有了点成绩,聊以慰藉

拉回正题

当前部门的项目团队,并非纯粹嵌入式开发,偏向于ARM高性能芯片+unbuntu模式,中位机的模块代码加上UT code,前前后后加起来,11W行了,正式代码估计就4W左右,其他都工具或者UT。

UT的改善也是从加入部门后开始加速的,算是起到一定带领作用,新特性的merge同时,也要满足行覆盖率(分支覆盖率没有硬性要求,代码设计的够好,少一些if else就OK)。

目前软件中,一个比较大的问题是组件间只有API接口,一路开发下来,各个组件的API调用互相交织,解耦困难,千辛万苦将UT覆盖刷到超80个点,mock遍布,带来的副作用是UT文件的.o超大。

编出来的执行文件,超过1个多G,Windows上跑GCC交叉编译会OOM,只有VC+linux 两个环境上的UT可以编出来。

其实陷入了瓶颈。

接下来进入新一轮新特性开发期,老的架构已经到了差不多不得不优化的程度。

算是在当前团队的最后一个心愿吧,这不,解决方案除了在业务流程优化,软件架构改进上(主要就是SOLID原则),需要引入轻量级线程间异步消息架构。

并非线程池,单纯的异步消息,事件,定时器这三种基本的功能。

Java环境下,这都不是事,C++环境下,搜索了一遍,没有现成的。

boost有异步消息架构和定时器,简单的也玩过,感觉上我们用起来有些门槛,随着C++新标准不断引入新特性新功能,boost对于中小型代码,反而没有特别吃香的感觉。

Nokia开源的一款 event machine,牛逼是牛逼,门槛也高,代码行数都快抵得上我们当前的项目的一小半了,弃疗。

开源的一些找来找去,在简洁上,差点意思。

另外,线程间异步消息架构,是几乎所有稍微大型的一点的软件的刚需。

关键的关键,是没有现成的,这一点不得不口出脏话,tm什么年代了,这玩意还需要敝帚自珍,当个宝,就不能分享出来,让广大的农民工朋友有一个基本的参考或者可以鄙视的模版?

要自己开发了,回忆起了10多年前,Dopra的FID/PID初始化+消息框架的架构。

琢磨了一段时间,开动。

2 简单介绍线程间异步消息框架的概念

如下的讲述,懒得画图了,到处都可以搜索到,画出来也是知识垃圾。

a 异步消息很简单,

    1对1,C + message broker(桥接)+ C

b 事件,为 1对N(0...:n),发布和订阅模型

    P(publisher) + event broker + S(subscribers)

c 定时器

不搞callback这一套,和起临时线程(异步任务)没有太大区别

T(trigger) + timer engine + user(handler)

上述三种,全部共享一个handler(FID/PID框架中的MsgProc)

message broker + event broker + timer engine 归属到一个全局功能实例,齐活。

基础的架构设计完毕后,开始猛起,要做一个一天赚1000妹币的男人(混到最惨了有木有)

圈里面看到有老哥们6月27号(周四)拿了最后一次加班夜宵,岁月不饶人,一个有生命力的团队必然是一个不断推陈出新的团队。

老司机呆长的,不到一定层级,慢慢就成为重点优化对象。当时也是年轻气盛,13年拿了个杭研十佳程序员,年底被领导平衡绩效C,果断离了。

历程转了个大弯,这不,Dopra的一套消息架构,真香,试着东施效颦了。

3 和Dopra框架的一点区别

不出意外的必然有很大的区别,咱这个是缩水版(就开发了4天,连蒙带猜)

1 受限于个人水平,怎么简单怎么来,元编程玩不来,羞愧。

2 也没有单独的消息内存管理那一套(如果是大型项目,视情况而言,想要做好内存操作隔离,那还是需要的,memory sandbox概念也早就有了,有需要就加上)

消息也不用老式的 VOS_MsgHeader + paylaod这一套来管理(踩内存尾记忆尤深),通通简化,当前的项目并不需要多个coordinator之间搞起,咱就是单体。

用最土的union来描述基本消息结构(想要玩的开,搞成container的话,那还有很多备选,比如同OS内部 IPC,zmq,消息protobuf封装),想要怎么玩,几乎都玩得起了现在。

3 Dopra的FID/PID还负责启动时初始化(也可以参考RT-Thread一套初始化方式),笔者其实也准备用上,没有啥技术上的巧,接口预留,各位看官可以酌情自己添加。

当然得当然,都是tm指针在管理内存,修改上完全开放的。

10年了,该忘的不该忘的,差不多忘记,伴随着年级增加的,只有体重。

4 干货分享

声明: 笔者也是受益于互联网,各位看官下载源码后,可自由分发和修改,笔者天然放弃著作权(有部分源码参考自github  cpptime/cpptime.h at master · eglimi/cpptime · GitHub)。

笔者当前分享的是初稿,只跑了几个UT case,后续看情况,待完善后会更新。

笔者相信,当前的代码离商业级别应该有一些差距,给大家带来的不便也敬请原谅,就开发了4天,中间还不断给各种杂事打断。

4.1 源代码目录

.
├── include
│   ├── VOS_Def.h
│   ├── VOS_IHandler.h
│   ├── VOS_Interface.h
│   └── private
│       ├── VOS_InterfaceImpl.h
│       ├── VOS_MsgQueue.h
│       └── VOS_TimerImpl.h
├── src
│   ├── VOS_IHandler.cpp
│   ├── VOS_InterfaceImpl.cpp
│   └── VOS_TimerImpl.cpp
└── tst└── VOS_IHandler_test.cpp

4.2 头文件介绍

4.2.1 VOS_Def.h

该文件由用户定制(有部分基础结构建议不要修改,比如VosMessage这种),比如消息基础结构,消息ID,timer 类型,event id以及对应的携带参数

#ifndef VOS_DEF_H_
#define VOS_DEF_H_
#include <chrono>namespace VOS
{enum class VOS_MessageType
{VOS_SYNC_MSG,VOS_TIMER,VOS_EVENT,VOS_BOTTOM = 255
};enum class VOS_CompName
{COMP_A = 0,COMP_B = 1,COMP_C = 2,COMP_D = 3,COMP_E = 4,COMP_F = 5,COMP_AR = 6,COMP_RED = 7,COMP_Bottom = 8 // the last one, if you add new one,COMP_Bottom must increase
};// user need define own EventID
enum class VOS_EventID
{SUB_SAHA_INIT_START = 0,SUB_SAHA_INIT_READY = 1,EVENT_SAHA_SOMETHING_OK,EVENT_SAHA_SOMETHING_FAIL};struct REP_ID_DATA
{int programId;int assayNumber;int repIdx;
};typedef union
{REP_ID_DATA      repData;int              reserve_1;
} VOSEventArg;enum class VOSTimerType
{VOSTimerType_1,VOSTimerType_2,VOSTimerType_3,VOSTimerType_BOTTOM = 255
};struct VOS_TIMER_MSG
{VOSTimerType timerType;
};/*
message id
VOS_ENV_XXX_REQ
VOS_ENV_XXX_RESP
*/struct VOS_SYNC_MSG
{int msgId;//add your own structure
};struct VOS_EVENT_MSG
{VOS_EventID eventId;VOSEventArg arg;
};/** 消息中携带的结构可能多种多样,要兼容各种结构在实现上难度太大* 这里的轻量级消息结构,基本元素为一段固定大小的内存块,选用union结构* union结构有限制,不同编译器可能不一样,最好在基层结构中不要用用户自定义的构造函数* 可参看 https://learn.microsoft.com/zh-cn/cpp/cpp/unions?view=msvc-170** union的一大特征在于,一个Union类中的所有数据共享同一段内存。* 如果union类的成员包含自己的构造函数,析构函数,* 那么同一union类的成员在初始化时,就有可能会执行不同的构造函数。* 这是无法预料的。所以,我们在定义union类时要尽量避免成员变量是对象(含有自己的构造函数)* 子结构中,最好不要包含自定义的一些类结构* 看下面 std::string 的报错* 注: 如果当前的编译器支持C++17以及以上的版本,则用std::variant, 类型安全有保证,也完全OK,*/
typedef union
{VOS_SYNC_MSG      xSyncMsg;VOS_TIMER_MSG     xTimerMsg;VOS_EVENT_MSG     xEventMsg;//std::string       test; error C2280: VOS::VosMsgData::VosMsgData(void)”: 尝试引用已删除的函数
} VosMsgData;struct VosMessage
{VosMessage(VOS_MessageType msgType) :msgType_(msgType){}VOS_MessageType   msgType_;VosMsgData        msgData_;
};using timer_id = std::size_t;
using clock = std::chrono::steady_clock;
using timestamp = std::chrono::time_point<clock>;}#endif // VOS_DEF_H_

4.2.2 VOS_Interface.h

API 总入口,单实例,getInstance()方式调用,笔者懒得private构造函数,形式而已

设计上不完美,这两个接口是内部框架调用的,这里也暴露给了用户

    virtual void VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;virtual void VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;

#ifndef VOS_INTERFACE_H_
#define VOS_INTERFACE_H_
#include "VOS_Def.h"
#include <functional>namespace VOS
{class VOS_Interface
{
public:explicit VOS_Interface() = default;virtual ~VOS_Interface() = default;VOS_Interface(const VOS_Interface&) = delete;VOS_Interface(VOS_Interface&&) = delete;VOS_Interface& operator=(const VOS_Interface&) = delete;VOS_Interface& operator=(VOS_Interface&&) = delete;virtual void VOS_SendMsg(VOS_CompName name, VosMessage &message) = 0;virtual void VOS_PublishEvent(VOS_EventID eventId, const VOSEventArg& arg) = 0;virtual void VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;virtual void VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;virtual void VOS_RegisterInitCB(VOS_CompName name) = 0;using MsgHandler = std::function<void(const VosMessage &msg)>;// warning: called by inner framework, user should not call thisvirtual void VOS_AddMsgHandler(VOS_CompName name, MsgHandler&& eh) = 0;virtual void VOS_RemoveMsgHandler(VOS_CompName name)  = 0;virtual timer_id StartTimer(VOS_CompName name, VOSTimerType timerType, unsigned int expiredTime, bool isPeriodic = false)= 0;virtual bool Canceltimer(timer_id id)= 0;static VOS_Interface& getInstance();};
}#endif // VOS_INTERFACE_H_

4.2.3 VOS_IHandler.h

每个需要处理消息的组件各自创建一个,参数就是VOS_CompName name

用户基本上只要继承,重载

virtual void handleMessage(const VosMessage &msg) = 0;

初始化完毕调用start() ,参看UT调用,并没有明确限制,可自行修改

void start();

具体如下


#ifndef VOS_IHANDLER_H_
#define VOS_IHANDLER_H_#include <memory>
#include "VOS_Def.h"namespace VOS
{class VOS_IHandler {
public:explicit VOS_IHandler(VOS_CompName name);virtual ~VOS_IHandler() = default;VOS_IHandler() = delete;VOS_IHandler(const VOS_IHandler&) = delete;VOS_IHandler(VOS_IHandler&&) = delete;VOS_IHandler& operator=(const VOS_IHandler&) = delete;VOS_IHandler& operator=(VOS_IHandler&&) = delete;void start();virtual void handleMessage(const VosMessage &msg) = 0;/* void VOS_IHandler::handleMessage(const VosMessage &msg){switch (msg.msgType_){case VOS_MessageType::VOS_TIMER://do somethingbreak;case VOS_MessageType::VOS_BOTTOM:default:break;}}*/protected:class Impl; std::shared_ptr<Impl> pimpl_;
};}
#endif /* VOS_IHANDLER_H_ */

接下来是内部实现

4.2.4 VOS_InterfaceImpl.h

该内部头文件为VOS_Interface.h 接口的具体实现类

#ifndef VOS_INTERFACE_IMPL_H_
#define VOS_INTERFACE_IMPL_H_
#include <memory>
#include <map>
#include <vector>
#include <thread>
#include <mutex>
#include "VOS_Interface.h"
#include "VOS_TimerImpl.h"namespace VOS
{
class VOS_InterfaceImpl : public VOS_Interface
{
public:VOS_InterfaceImpl();virtual ~VOS_InterfaceImpl();VOS_InterfaceImpl(const VOS_InterfaceImpl&) = delete;VOS_InterfaceImpl(VOS_InterfaceImpl&&) = delete;VOS_InterfaceImpl& operator=(const VOS_InterfaceImpl&) = delete;VOS_InterfaceImpl& operator=(VOS_InterfaceImpl&&) = delete;void VOS_SendMsg(VOS_CompName name, VosMessage &message) override;void VOS_PublishEvent(VOS_EventID eventId, const VOSEventArg& arg) override;void VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name) override;void VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name) override;void VOS_RegisterInitCB(VOS_CompName name) override;void VOS_AddMsgHandler(VOS_CompName name, MsgHandler&& eh) override;void VOS_RemoveMsgHandler(VOS_CompName name) override;timer_id StartTimer(VOS_CompName name, VOSTimerType timerType, unsigned int expiredms, bool isPeriodic) override;bool Canceltimer(timer_id id) override;void VOS_SendTimerMsg(VOS_CompName name, VOSTimerType timerType);private:void innerSendMsg(VOS_CompName name, VosMessage &message);std::map<VOS_CompName, MsgHandler> handlers_;std::map<VOS_EventID, std::vector<VOS_CompName>> event_map_;std::shared_ptr<VOS_Timer> timers_;std::mutex interface_lock_;};}#endif // VOS_INTERFACE_IMPL_H_
4.2.5 VOS_MsgQueue.h

为安全队列的基本封装,有些冗余的代码,部分原子变量操作时不完全规范,大家可以用原生的atomic操作自行修改

#ifndef VOS_MSGQUEUE_H_
#define VOS_MSGQUEUE_H_#include <memory>
#include <atomic>
#include <queue>
#include <functional>
#include <iostream>
#include <condition_variable>#include "VOS_Def.h"namespace VOS
{using VOSQueueElementPtr = std::unique_ptr<VosMessage>;
using VOSQueueElements   = std::queue<VOSQueueElementPtr>;using SubscriberCb = std::function<void(const VosMessage &msg )>;class VOS_MsgQueue
{
public:explicit VOS_MsgQueue(){};~VOS_MsgQueue(){notify();};VOS_MsgQueue(const VOS_MsgQueue&) = delete;VOS_MsgQueue(VOS_MsgQueue&&) = delete;VOS_MsgQueue& operator=(const VOS_MsgQueue&) = delete;VOS_MsgQueue& operator=(VOS_MsgQueue&&) = delete;void addSubscriberCb(SubscriberCb Cb){subScriberCb = Cb;}void sendMessage(VOSQueueElementPtr msg){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);// if queue size > MAX_QUEUE_SIZE, that means VOS task might have some issue or might be blocked if(getRunningFlag() && (VOSQueue_messages_.size() < MAX_QUEUE_SIZE)){VOSQueue_messages_.push(std::move(msg));ready = true;lck.unlock();notify();}}void handleMessage(){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);cv.wait(lck, [this]() { return this->ready; });VOSQueueElements tmpVOSQueue;while(VOSQueue_messages_.size()){tmpVOSQueue.push(std::move(VOSQueue_messages_.front()));VOSQueue_messages_.pop();}ready = false;lck.unlock();while(tmpVOSQueue.size()){auto msg = std::move(tmpVOSQueue.front());tmpVOSQueue.pop();//do somethingsubScriberCb(*msg);}}int getQueueSize(){std::unique_lock<std::mutex> lock(VOSQueue_message_lock_);return static_cast<int>(VOSQueue_messages_.size());}void Stop(){{std::lock_guard<std::mutex> lk{VOSQueue_message_lock_};ready = true;}// set atomic flag to true and notify event handler threadnotify();}void clear(){notify();}void notify(){cv.notify_all();}void clearQueue(){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);while(VOSQueue_messages_.size()){VOSQueue_messages_.pop();}interfaceRunningFlag_ = false;}void setRunningFlag(bool flag){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);interfaceRunningFlag_ = flag;}bool getRunningFlag(){return interfaceRunningFlag_;}private:const int MAX_QUEUE_SIZE = 500;VOSQueueElements VOSQueue_messages_;SubscriberCb subScriberCb;std::mutex VOSQueue_message_lock_;std::condition_variable cv;std::atomic<bool> interfaceRunningFlag_ = false;bool ready = false;std::atomic_flag synFlag_ = ATOMIC_FLAG_INIT;
};}#endif // VOS_MSGQUEUE_H_
4.2.6 VOS_TimerImpl.h

这个稍微复杂,参考代码来自网页cpptime/cpptime.h at master · eglimi/cpptime · GitHub


#ifndef VOS_TIMERIMPL_H_
#define VOS_TIMERIMPL_H_#include "VOS_Def.h"
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <set>
#include <stack>
#include <thread>
#include <vector>namespace VOS
{using TimerSubscriberCb = std::function<void(VOS_CompName name, VOSTimerType timerType)>;// Public types
#if 0
using timer_id = std::size_t;
using handler_t = std::function<void(timer_id)>;
using clock = std::chrono::steady_clock;
using timestamp = std::chrono::time_point<clock>;
#endif
using duration = std::chrono::milliseconds;// The event structure that holds the information about a timer.
struct Event
{timer_id id;timestamp start;duration period;VOS_CompName name;VOSTimerType timerType;bool valid;Event(): id(0), start(duration::zero()),period(duration::zero()),name(VOS_CompName::COMP_Bottom),timerType(VOSTimerType::VOSTimerType_BOTTOM),valid(false){}Event(timer_id id,timestamp start,duration period,VOS_CompName name,VOSTimerType timerType): id(id),start(start),period(period),name(name),timerType(timerType),valid(true){}Event(Event &&r) = default;Event &operator=(Event &&ev) = default;Event(const Event &r) = delete;Event &operator=(const Event &r) = delete;
};// A time event structure that holds the next timeout and a reference to its
// Event struct.
struct Time_event
{timestamp next;timer_id ref;
};inline bool operator<(const Time_event &l, const Time_event &r)
{return l.next < r.next;
}class VOS_Timer
{using scoped_m = std::unique_lock<std::mutex>;public:VOS_Timer();~VOS_Timer();// user should register first and then startvoid addSubscriberCb(TimerSubscriberCb Cb){subScriberCb = Cb;}void start(){setRunningFlag(true);threadForTimer();}timer_id StartTimer(VOS_CompName name, VOSTimerType timerType, unsigned int expiredms, bool isPeriodic){return add(expiredms,name,timerType,isPeriodic? expiredms:0);}bool Canceltimer(timer_id id){return remove(id);}/*** Add a new timer.** \param when The time at which the handler is invoked.* \param handler The callable that is invoked when the timer fires.* \param period The periodicity at which the timer fires. Only used for* periodic timers.*/timer_id add(const timestamp &when,  VOS_CompName name, VOSTimerType timerType,const duration &period = duration::zero()){scoped_m lock(m);timer_id id = 0;// Add a new event. Prefer an existing and free id. If none is// available, add a new one.if (free_ids.empty()){id = events.size();Event e(id, when, period, name, timerType);events.push_back(std::move(e));}else{id = free_ids.top();free_ids.pop();Event e(id, when, period, name, timerType);events[id] = std::move(e);}time_events.insert(Time_event{when, id});lock.unlock();cond.notify_all();return id;}/*** Overloaded `add` function that uses a `std::chrono::duration` instead of* a `time_point` for the first timeout.*/template <class Rep, class Period>inline timer_id add(const std::chrono::duration<Rep, Period> &when,VOS_CompName name, VOSTimerType timerType,const duration &period = duration::zero()){return add(clock::now() +std::chrono::duration_cast<std::chrono::milliseconds>(when),name, timerType, period);}/*** Overloaded `add` function that uses a uint64_t instead of a `time_point`* for the first timeout and the period.*/inline timer_id add(const uint64_t when, VOS_CompName name, VOSTimerType timerType,const uint64_t period = 0){return add(duration(when), name, timerType, duration(period));}/*** Removes the timer with the given id.*/bool remove(timer_id id){scoped_m lock(m);if (events.size() == 0 || events.size() <= id){return false;}events[id].valid = false;events[id].name = VOS_CompName::COMP_Bottom;events[id].timerType = VOSTimerType::VOSTimerType_BOTTOM;auto it =std::find_if(time_events.begin(), time_events.end(),[&](const Time_event &te) { return te.ref == id; });if (it != time_events.end()){free_ids.push(it->ref);time_events.erase(it);}lock.unlock();cond.notify_all();return true;}protected:TimerSubscriberCb subScriberCb;void setRunningFlag(bool flag);void release();void timerEngine();void threadForTimer();bool loopCondition();
private: const duration time_wait_step = duration(100);//100ms// Thread and locking variables.std::mutex m;std::condition_variable cond;// The vector that holds all active events.std::vector<Event> events;// Sorted queue that has the next timeout at its top.std::multiset<Time_event> time_events;// A list of ids to be re-used. If possible, ids are used from this pool.std::stack<timer_id> free_ids;std::atomic<bool> runningFlag_ = true;std::thread timer_thread_;};} // namespace VOS#endif /* VOS_TIMERIMPL_H_ */

4.3 src下 cpp实现

4.3.1 VOS_IHandler.cpp

#include <iostream>
#include "VOS_IHandler.h"
#include "VOS_MsgQueue.h"
#include "VOS_Interface.h"namespace VOS
{class VOS_IHandler::Impl {
public:explicit Impl(VOS_CompName name): Impl(name, std::make_shared<VOS_MsgQueue>(), VOS_Interface::getInstance()){}Impl(VOS_CompName name,std::shared_ptr<VOS_MsgQueue> msgQueue,VOS_Interface& vos_instance): name_(name),msgQueue_(msgQueue),vos_instance_(vos_instance){}~Impl(){release();}Impl() = delete;Impl(const Impl&) = delete;Impl(Impl&&) = delete;Impl& operator=(const Impl&) = delete;Impl& operator=(Impl&&) = delete;void sendMsg(const VosMessage &msg){auto element = std::make_unique<VosMessage>(msg);msgQueue_->sendMessage(std::move(element));}void start(SubscriberCb&& Cb){msgQueue_->addSubscriberCb(std::forward<SubscriberCb>(Cb));msgQueue_->setRunningFlag(true);initAsyncMessageTask();vos_instance_.VOS_AddMsgHandler(name_, [=](const VosMessage &msg){this->sendMsg(msg);});}protected:void initAsyncMessageTask();void release();void setRunningFlag(bool flag);bool loopCondition();void threadForEventHandler();void eventHandler();bool runningFlag_ = false;std::thread event_handler_thread_;VOS_CompName name_;std::shared_ptr<VOS_MsgQueue> msgQueue_;VOS_Interface& vos_instance_;
};bool VOS_IHandler::Impl::loopCondition()
{return runningFlag_;
}void VOS_IHandler::Impl::eventHandler()
{    while (loopCondition()){try{msgQueue_->handleMessage();}catch(const std::exception& e){return;}catch (...){return;}}
}void VOS_IHandler::Impl::threadForEventHandler()
{if(!event_handler_thread_.joinable()){event_handler_thread_ = std::thread([this](){this->eventHandler();});}
}void VOS_IHandler::Impl::setRunningFlag(bool flag)
{runningFlag_ = flag;
}
void VOS_IHandler::Impl::initAsyncMessageTask()
{setRunningFlag(true);threadForEventHandler();
}void VOS_IHandler::Impl::release()
{vos_instance_.VOS_RemoveMsgHandler(name_);setRunningFlag(false);if(event_handler_thread_.joinable()){msgQueue_->Stop();event_handler_thread_.join();}
}VOS_IHandler::VOS_IHandler(VOS_CompName name): pimpl_( std::make_shared<VOS_IHandler::Impl>(name) )
{}void  VOS_IHandler::start()
{auto handler = [this](const VosMessage &msg){this->handleMessage(msg);};pimpl_->start(handler);
}}
4.3.2 VOS_InterfaceImpl.cpp
#include "VOS_InterfaceImpl.h"
#include <algorithm>namespace VOS
{VOS_InterfaceImpl::VOS_InterfaceImpl(): timers_(std::make_shared<VOS_Timer>())
{timers_->addSubscriberCb([this](VOS_CompName name, VOSTimerType timerType){this->VOS_SendTimerMsg(name, timerType);});timers_->start();
}
VOS_InterfaceImpl:: ~VOS_InterfaceImpl()
{timers_.reset();
}void VOS_InterfaceImpl::VOS_AddMsgHandler(VOS_CompName name, MsgHandler&& eh)
{if(!eh){return;}std::unique_lock<std::mutex> lck(interface_lock_);if (handlers_.find(name) != handlers_.end()){handlers_[name] = std::move(eh);}else{handlers_.insert(std::make_pair(name, std::forward<MsgHandler>(eh)));}    
}void VOS_InterfaceImpl::VOS_RemoveMsgHandler(VOS_CompName name)
{std::unique_lock<std::mutex> lck(interface_lock_);auto handler = [](const VosMessage &){};if (handlers_.find(name) != handlers_.end()){handlers_[name] = std::move(handler);}}void VOS_InterfaceImpl::VOS_SendMsg(VOS_CompName name, VosMessage &message)
{std::unique_lock<std::mutex> lck(interface_lock_);innerSendMsg(name, message);
}void VOS_InterfaceImpl::innerSendMsg(VOS_CompName name, VosMessage &message)
{auto iter = handlers_.find(name);if(iter != handlers_.end()){iter->second(message);}//handlers_[name](message);
}void VOS_InterfaceImpl::VOS_SendTimerMsg(VOS_CompName name, VOSTimerType timerType)
{VosMessage message(VOS_MessageType::VOS_TIMER);message.msgData_.xTimerMsg.timerType = timerType;VOS_SendMsg(name, message);
}void VOS_InterfaceImpl::VOS_PublishEvent(VOS_EventID eventId, const VOSEventArg& arg)
{std::unique_lock<std::mutex> lck(interface_lock_);auto map_iter = event_map_.find(eventId);if(map_iter != event_map_.end()){std::for_each(map_iter->second.begin(),  map_iter->second.end(), [eventId, arg, this](VOS_CompName name){VosMessage message(VOS_MessageType::VOS_EVENT);message.msgData_.xEventMsg.eventId = static_cast<VOS_EventID> (eventId);message.msgData_.xEventMsg.arg = arg;this->innerSendMsg(name, message);});}
}void VOS_InterfaceImpl::VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name)
{std::unique_lock<std::mutex> lck(interface_lock_);auto map_iter = event_map_.find(eventId);if(map_iter != event_map_.end()){auto vec_iter = std::find(map_iter->second.begin(),  map_iter->second.end(), name);//already exist, returnif(vec_iter != map_iter->second.end()){return;}// push back new oneelse{map_iter->second.push_back(name);}}else{event_map_[eventId].push_back(name);}return;
}void VOS_InterfaceImpl::VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name)
{std::unique_lock<std::mutex> lck(interface_lock_);auto map_iter = event_map_.find(eventId);if(map_iter != event_map_.end()){auto vec_iter = std::find(map_iter->second.begin(),  map_iter->second.end(), name);//already exist, returnif(vec_iter != map_iter->second.end()){map_iter->second.erase(vec_iter);return;}}return;
}void VOS_InterfaceImpl::VOS_RegisterInitCB(VOS_CompName name)
{}timer_id VOS_InterfaceImpl::StartTimer(VOS_CompName name, VOSTimerType timerType, unsigned int expiredms, bool isPeriodic)
{std::unique_lock<std::mutex> lck(interface_lock_);return timers_->StartTimer(name, timerType, expiredms, isPeriodic );
}
#if 0
timer_id VOS_InterfaceImpl::StartTimer(VOS_CompName name, VOSTimerType timerType, const timestamp &when, unsigned int interval)
{std::unique_lock<std::mutex> lck(interface_lock_);return timers_->StartTimer(name, timerType, when, interval );
}
#endif
bool VOS_InterfaceImpl::Canceltimer(timer_id id)
{std::unique_lock<std::mutex> lck(interface_lock_);return timers_->Canceltimer(id);
}VOS_Interface& VOS_Interface::getInstance()
{static VOS_InterfaceImpl instance;return instance;
}}
4..3.3 VOS_TimerImpl.cpp
#include <algorithm>
#include "VOS_TimerImpl.h"namespace VOS
{VOS_Timer::VOS_Timer() : m{}, cond{},  events{}, time_events{}, free_ids{}
{}VOS_Timer:: ~VOS_Timer()
{release();
}void VOS_Timer::setRunningFlag(bool flag)
{runningFlag_ = flag;
}
void VOS_Timer::release()
{setRunningFlag(false);if(timer_thread_.joinable()){cond.notify_all();timer_thread_.join();}events.clear();time_events.clear();while (!free_ids.empty()){free_ids.pop();}
}bool VOS_Timer::loopCondition()
{return runningFlag_;
}void VOS_Timer::timerEngine()
{int i = 0;    while (loopCondition()){scoped_m lock(m);if (time_events.empty()){// Wait for workcond.wait(lock);}else{Time_event te = *time_events.begin();if (clock::now() >= te.next){// Remove time eventtime_events.erase(time_events.begin());// this is vital, remmber to addSubscriberCb firstlock.unlock();if(subScriberCb){subScriberCb(events[te.ref].name,events[te.ref].timerType);}lock.lock();if (events[te.ref].valid &&events[te.ref].period.count() > 0){// The event is valid and a periodic timer.te.next += events[te.ref].period;time_events.insert(te);}else{// The event is either no longer valid because it was// removed in the callback, or it is a one-shot timer.events[te.ref].valid = false;events[te.ref].name = VOS_CompName::COMP_Bottom;events[te.ref].timerType = VOSTimerType::VOSTimerType_BOTTOM;free_ids.push(te.ref);}}else{//cond.wait_until(lock, te.next);auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds> (te.next - std::chrono::steady_clock::now());auto wait_time = duration_ms < time_wait_step ?  duration_ms : time_wait_step;cond.wait_for(lock, wait_time);}}}}void VOS_Timer::threadForTimer()
{if(!timer_thread_.joinable()){timer_thread_ = std::thread([this](){this->timerEngine();});}
}}

简单党员测试代码(VC2022 上 google test的配置见之前文章)

4.4  UT 简单测试

VOS_IHandler_test.cpp
#include <iostream>
#include <thread>
#include <string>#include "VOS_IHandler.h"
#include "VOS_Interface.h"#include "gmock/gmock.h"
#include "gtest/gtest.h"using ::testing::_;
using ::testing::NiceMock;
using ::testing::Return;
using namespace testing;namespace VOS
{class VOS_IHandlerWrapper : public VOS_IHandler
{
public:VOS_IHandlerWrapper(VOS_CompName name):VOS_IHandler(name){}void handleMessage(const VosMessage &msg){messagenum++;switch (msg.msgType_){case VOS_MessageType::VOS_SYNC_MSG:sync_msg = msg.msgData_.xSyncMsg;break;case VOS_MessageType::VOS_TIMER:timer_msg = msg.msgData_.xTimerMsg;break;case VOS_MessageType::VOS_BOTTOM:break;case VOS_MessageType::VOS_EVENT:event_msg = msg.msgData_.xEventMsg;break;default:break;}}void SleepForPeriod(int numberOfseconds){std::this_thread::sleep_for(std::chrono::seconds(numberOfseconds));}int messagenum = 0;VOS_EVENT_MSG event_msg;VOS_SYNC_MSG  sync_msg;VOS_TIMER_MSG timer_msg;};class VOS_IHandlerTest : public ::testing::Test
{public:void SetUp() override{messageHandler_ar = std::make_shared<VOS_IHandlerWrapper>(VOS_CompName::COMP_AR);messageHandler_ar->start();messageHandler_red = std::make_shared<VOS_IHandlerWrapper>(VOS_CompName::COMP_RED);messageHandler_red->start();}std::shared_ptr<VOS_IHandlerWrapper> messageHandler_ar;std::shared_ptr<VOS_IHandlerWrapper> messageHandler_red;};TEST_F(VOS_IHandlerTest, VOS_SYNC_MSG_Process_OK)
{VosMessage message(VOS_MessageType::VOS_SYNC_MSG);message.msgData_.xSyncMsg.reserve_1 = 999;VOS_Interface::getInstance().VOS_SendMsg(VOS_CompName::COMP_AR, message);messageHandler_ar->SleepForPeriod(2);if(messageHandler_ar->messagenum != 0){EXPECT_EQ(messageHandler_ar->messagenum, 1);EXPECT_EQ(messageHandler_ar->sync_msg.reserve_1,  999);}
}TEST_F(VOS_IHandlerTest, VOS_PublishEvent_OK)
{VOS_Interface::getInstance().VOS_RegisterEvent( static_cast<VOS_EventID>(5), VOS_CompName::COMP_AR);VOS_EventID eventId;VOSEventArg arg;arg.repData = {1,2,3};VOS_Interface::getInstance().VOS_PublishEvent(static_cast<VOS_EventID>(5), arg);messageHandler_ar->SleepForPeriod(2);if(messageHandler_ar->messagenum != 0){EXPECT_EQ(messageHandler_ar->messagenum, 1);EXPECT_EQ(messageHandler_ar->event_msg.eventId,  static_cast<VOS_EventID>(5));EXPECT_EQ(messageHandler_ar->event_msg.arg.repData.programId,  1);EXPECT_EQ(messageHandler_ar->event_msg.arg.repData.assayNumber,  2);EXPECT_EQ(messageHandler_ar->event_msg.arg.repData.repIdx,  3);}VOS_Interface::getInstance().VOS_UnRegisterEvent( static_cast<VOS_EventID>(5), VOS_CompName::COMP_AR);
}TEST_F(VOS_IHandlerTest, VOS_TIMER_MSG_ONCE_TIMER_OK)
{auto timer_id = VOS_Interface::getInstance().StartTimer(VOS_CompName::COMP_AR, VOSTimerType::VOSTimerType_BOTTOM, 1000);messageHandler_ar->SleepForPeriod(2);if(messageHandler_ar->messagenum != 0){EXPECT_EQ(messageHandler_ar->messagenum, 1);EXPECT_EQ(messageHandler_ar->timer_msg.timerType,  VOSTimerType::VOSTimerType_BOTTOM);}
}TEST_F(VOS_IHandlerTest, VOS_TIMER_MSG_PERIODIC_TIMER_OK)
{auto timer_id = VOS_Interface::getInstance().StartTimer(VOS_CompName::COMP_AR, VOSTimerType::VOSTimerType_BOTTOM, 300, true);messageHandler_ar->SleepForPeriod(1);if(messageHandler_ar->messagenum != 0){EXPECT_EQ(messageHandler_ar->timer_msg.timerType,  VOSTimerType::VOSTimerType_BOTTOM);}auto messagenum = messageHandler_ar->messagenum;VOS_Interface::getInstance().Canceltimer(timer_id);messageHandler_ar->SleepForPeriod(1);EXPECT_EQ(messagenum, messageHandler_ar->messagenum);
}}

先写到这里,无烟无酒无故事。

修改记录: 

2024/07/02 开始UT,

1 发现死锁,VOS_PublishEvent->VOS_SendMsg  修改成  VOS_PublishEvent->innerSendMsg

2 timer等待最近的超时节点,原先用的wait_until,比如需要等待绝对时间2000ms的话,这期间 其他地方调用CancelTimer就会由于被time线程lock住了,造成pending状况,直到wait_until超时返回,因此粗暴的设置了一个100ms的最长等待周期,用于释放锁。那么CancelTimer从理论上就有可能pending超过100ms,这个在实时系统中是不可接受的,只能用于一般性的对时延不太敏感的系统中。

这一点其实不太合心意,后面再找找有没有更优秀的实现。

相关文章:

C++轻量级 线程间异步消息架构(向曾经工作的ROSA-RB以及共事的DOPRA的老兄弟们致敬)

1 啰嗦一番背景 这么多年&#xff0c;换着槽位做牛做马&#xff0c;没有什么钱途 手艺仍然很潮&#xff0c;唯有对于第一线的码农工作&#xff0c;孜孜不倦&#xff0c;其实没有啥进步&#xff0c;就是在不断地重复&#xff0c;刷熟练度&#xff0c;和同期的老兄弟们&#xf…...

Kotlin中的类

类初始化顺序 constructor 里的参数列表是首先被执行的&#xff0c;紧接着是 init 块和属性初始化器&#xff0c;最后是次构造函数的函数体。 主构造函数参数列表firstProperty 初始化第一个 init 块secondProperty 初始化第二个 init 块次构造函数函数体 class Example const…...

VSCode中常用的快捷键

通用操作快捷键 显示命令面板&#xff1a;Ctrl Shift P or F1&#xff0c;用于快速访问VSCode的各种命令。 快速打开&#xff1a;Ctrl P&#xff0c;可以快速打开文件、跳转到某个行号或搜索项目内容。 新建窗口/实例&#xff1a;Ctrl Shift N&#xff0c;用于打开一个新的…...

代码随想录-Day45

198. 打家劫舍 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报警。 给定一个代表每个…...

Rust Eq 和 PartialEq

Eq 和 PartialEq 在 Rust 中&#xff0c;想要重载操作符&#xff0c;你就需要实现对应的特征。 例如 <、<、> 和 > 需要实现 PartialOrd 特征: use std::fmt::Display;struct Pair<T> {x: T,y: T, }impl<T> Pair<T> {fn new(x: T, y: T) ->…...

思考如何学习一门编程语言?

一、什么是编程语言 编程语言是一种用于编写计算机程序的人工语言。通过编程语言&#xff0c;程序员可以向计算机发出指令&#xff0c;控制计算机执行各种任务和操作。编程语言由一组语法规则和语义规则组成&#xff0c;这些规则定义了如何编写代码以及代码的含义。 编程语言…...

顺序串算法库构建

学习贺利坚老师顺序串算法库 数据结构之自建算法库——顺序串_创建顺序串s1,创建顺序串s2-CSDN博客 本人详细解析博客 串的概念及操作_串的基本操作-CSDN博客 版本更新日志 V1.0: 在贺利坚老师算法库指导下, 结合本人详细解析博客思路基础上,进行测试, 加入异常弹出信息 v1.0补…...

[论文阅读笔记33] Matching Anything by Segmenting Anything (CVPR2024 highlight)

这篇文章借助SAM模型强大的泛化性&#xff0c;在任意域上进行任意的多目标跟踪&#xff0c;而无需任何额外的标注。 其核心思想就是在训练的过程中&#xff0c;利用strong augmentation对一张图片进行变换&#xff0c;然后用SAM分割出其中的对象&#xff0c;因此可以找到一组图…...

阿里Nacos下载、安装(保姆篇)

文章目录 Nacos下载版本选择Nacos安装Windows常见问题解决 更多相关内容可查看 Nacos下载 Nacos官方下载地址&#xff1a;https://github.com/alibaba/nacos/releases 码云拉取&#xff08;如果国外较慢或者拉取超时可以试一下国内地址&#xff09; //国外 git clone https:…...

四、golang基础之defer

文章目录 一、定义二、作用三、结果四、recover错误拦截 一、定义 defer语句被用于预定对一个函数的调用。可以把这类被defer语句调用的函数称为延迟函数。 二、作用 释放占用的资源捕捉处理异常输出日志 三、结果 如果一个函数中有多个defer语句&#xff0c;它们会以LIFO…...

机器人----四元素

四元素 四元素的大小 [-1,1] 欧拉角转四元素...

IBM Spectrum LSF Application Center 提供单一界面来管理应用程序、用户、资源和数据

IBM Spectrum LSF Application Center 提供单一界面来管理应用程序、用户、资源和数据 亮点 ● 简化应用程序管理 ● 提高您的工作效率 ● 降低资源管理的复杂性 ● 深入了解流程 IBM Spectrum LSF Application Center 为集群用户和管理员提供了一个灵活的、以应用为中心的界…...

如何选择品牌推广公司?哪家好?收费标准及评价!

不管是什么品牌&#xff0c;推广对公司的成败起了很关键的作用。然而&#xff0c;面对市面上琳琅满目的品牌推广公司&#xff0c;如何选择一家既熟悉又靠谱的公司&#xff0c;成为许多企业主面临的难题。 作为一家手工酸奶品牌的创始人&#xff0c;目前全国也复制了100多家门店…...

JDeveloper 12C 官网下载教程

首先、我们要登录Oracle官网 Oracle 甲骨文中国 | 云应用和云平台 登录进去如果不是中文可以点击右上角带有国旗的图标就行更改&#xff0c;选择一个你能看懂的文字。 然后&#xff0c;点击“资源”—点击“开发人员下载” 然后&#xff0c;点击“开发工具” 这里有很多工具可…...

中英双语介绍美国的州:印第安纳州(Indiana)

中文版 印第安纳州简介 印第安纳州位于美国中西部地区&#xff0c;是一个以其农业、制造业和体育文化而著称的州。以下是对印第安纳州的详细介绍&#xff0c;包括其地理位置、人口、经济、教育、文化和主要城市。 地理位置 印第安纳州东临俄亥俄州&#xff0c;北接密歇根州…...

Flink实现准确和高效流处理的关键问题

时间相关: Watermark 水位线 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据。水位线主要的内容是一个时间戳,用来表示当前事件时间的进展。水位线是基于数据的时间戳生成的。水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进,进展。水位线…...

isidentifier()方法——判断字符串是否为合法的Python标识符或变量名

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 语法参考 isidentifier()方法用于判断字符串是否是有效的Python标识符&#xff0c;还可以用来判断变量名是否合法。isidentifier()方法的语法格式如…...

天猫商品列表数据接口(Tmall.item_search)

天猫平台商品列表数据接口&#xff08;taobao.item_search&#xff09;是天猫开放平台提供的一个API接口&#xff0c;用于获取天猫平台上的商品列表数据。通过该接口&#xff0c;用户可以获取到商品的名称、价格、销量、评价等信息。下面将具体介绍这个接口的各个方面&#xff…...

React+TS前台项目实战(二十一)-- Search业务组件封装实现全局搜索

文章目录 前言一、Search组件封装1. 效果展示2. 功能分析3. 代码详细注释4. 使用方式 二、搜索结果展示组件封装1. 功能分析2. 代码详细注释 三、引用到文件&#xff0c;自行取用总结 前言 今天&#xff0c;我们来封装一个业务灵巧的组件&#xff0c;它集成了全局搜索和展示搜…...

SEO与AI的结合:如何用ChatGPT生成符合搜索引擎优化的内容

在当今数字时代&#xff0c;搜索引擎优化&#xff08;SEO&#xff09;已成为每个网站和内容创作者都必须掌握的一项技能。SEO的主要目标是通过优化内容&#xff0c;使其在搜索引擎结果页面&#xff08;SERP&#xff09;中排名更高&#xff0c;从而吸引更多的流量。然而&#xf…...

【信息系统项目管理师知识点速记】组织通用管理:知识管理

23.3 知识管理 23.3.1 知识管理基础 知识管理是通过利用各种知识和技术手段,帮助组织和个人生产、分享、应用和创新知识,以形成知识优势并在个人、组织、业务目标、经济绩效和社会效益方面产生价值的过程。它能为组织带来知识增值,创造新的价值,提升决策效能和水平,是提…...

CM-UNet: Hybrid CNN-Mamba UNet for Remote Sensing Image Semantic Segmentation

论文&#xff1a;CM-UNet: Hybrid &#xff1a;CNN-Mamba UNet for Remote Sensing Image Semantic Segmentation 代码&#xff1a;https://github.com/XiaoBuL/CM-UNet Abstrcat: 由于大规模图像尺寸和对象变化&#xff0c;当前基于 CNN 和 Transformer 的遥感图像语义分割方…...

DP:子序列问题

文章目录 什么是子序列子序列的特点举例说明常见问题 关于子序列问题的几个例题1.最长递增子序列2.摆动序列3.最长递增子序列的个数4.最长数对链5.最长定差子序列 总结 什么是子序列 在计算机科学和数学中&#xff0c;子序列&#xff08;Subsequence&#xff09;是指从一个序列…...

Spring Data与多数据源配置

Spring Data与多数据源配置 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们来探讨如何在Spring Data中配置和使用多个数据源。 在现代应用程序中&…...

【前端vue3】TypeScrip-类型推论和类型别名

类型推论 TypeScript里&#xff0c;在有些没有明确指出类型的地方&#xff0c;类型推论会帮助提供类型。 例如&#xff1a; 变量xiaoc被推断类型为string 如重新给xiaoc赋值数字会报错 let xiaoc "xiaoc"xiaoc 1111111111111如没有给变量指定类型和赋值&#xf…...

javaEE——Servlet

1.web开发概述 所谓web开发,指的是从网页中向后端程序发送请求,与后端程序进行交互 2.java后端开发环境搭建 web后端(javaEE)程序需要运行在服务器中的&#xff0c;这样前端才可以访问得到 3.服务器是什么&#xff1f; ①服务器就是一款软件&#xff0c;可以向其发送请求&#…...

Kotlin扩展函数(also apply run let)和with函数

also apply run let with的使用例子 private fun testOperator() {/*** also*/val person Person("ZhangSan", 18)person.also {// 通常仅仅打印使用, 也可以通过it修改it.name "ZhangSan1"println("also inner name: " it.name)}println(&qu…...

C语言笔记27 •单链表介绍•

1.链表的概念及结构 链表是⼀种物理存储结构上非连续、非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表 中的指针链接次序实现的。 2. 顺序表带来的问题 (1)中间/头部的插⼊删除&#xff0c;时间复杂度为O(N) (2)增容需要申请新空间&#xff0c;拷⻉数据&#xff…...

C++编程(五)单例模式 友元

文章目录 一、单例模式&#xff08;一&#xff09;概念&#xff08;二&#xff09;实现方式1. 饿汉式2. 懒汉式 二、友元&#xff08;一&#xff09;概念&#xff08;二&#xff09;友元函数1.概念2.语法格式3. 使用示例访问静态成员变量访问非静态成员变量 &#xff08;三&…...

012-GeoGebra基础篇-构造圆的切线

前边文章对于基础内容已经悉数覆盖了&#xff0c;这一篇我就不放具体的细节&#xff0c;若有需要可以复刻一下 目录 一、成品展示二、算式内容三、正确性检查五、文章最后 一、成品展示 二、算式内容 A(0,0) B(3,0) c: Circle(A,B) C(5,4) sSegment(A,C) DMidpoint(s) d: Circ…...