项目扩展二:消息拉取功能的实现
项目扩展二:消息拉取功能的实现
- 一、回顾一下消息推送功能是如何实现的
- 二、设计消息拉取功能
- 1.服务器如何处理
- 2.定义Request和Response
- 1.定义Request
- 2.proto文件
- 三、服务器实现消息拉取
- 1.业务模块的实现:信道模块
- 2.消费者管理模块实现O(1)获取消费者
- 1.目前的情形
- 2.加一个哈希表?
- 3.如何做?
- 4.代码
- 3.信道模块实现
- 4.broker服务器注册响应业务函数
- 四、客户端修改
- 五、修改消息推送逻辑--设计
- 1.考量
- 2.要不要给BOTH消息推送机制呢?
- 3.设计
- 六、实现
- 1.修改消息proto文件
- 2.修改队列消息管理模块
- 1.成员的修改
- 2.recovery恢复历史消息的修改
- 3.发布消息的修改
- 3.front的修改
- 4.clear的修改
- 3.总体消息管理模块修改
- 1.PublishMessage加一个参数
- 2.获取链表队头消息修改
- 4.虚拟机模块的修改
- 1.发布消息增加一个参数
- 1.虚拟机模块
- 2.虚拟机管理模块
- 2.推送和拉取消息的修改
- 1.虚拟机模块
- 2.虚拟机管理模块
- 5.proto网络通信协议修改
- 1.BasicPublishRequest
- 6.信道模块修改
- 1.发布消息的修改
- 2.publishCallback的修改
- 1.坑点--连锁BUG
- 2.解决方案
- 3.实现接口
- 4.publishCallback的修改
- 3.拉取消息的修改
- 7.客户端修改
- 七、验证
- 1.消息拉取功能与恢复功能联合测试
- 1.测试1
- 1.生产者
- 2.消费者
- 3.演示
- 2.测试2 -- 演示
- 2.PULL测试
- 2.BOTH测试
一、回顾一下消息推送功能是如何实现的
这其中一共有三个角色:
二、设计消息拉取功能
给客户端多提供一个服务:消息拉取服务
其实就是从该消费者订阅的队列当中取出一个消息,推送给该消费者
只不过这个消息拉取是由消费者主动向我们服务器发起请求的
因此,我们要考虑两点:
- 服务器如何主动向消费者发送消息
- 网络通信协议中Request和Response的定义
1.服务器如何处理
要完成:主动向消费者发送消息这一任务,需要两个模块:消费者管理模块和虚拟机模块
- 从消费者管理模块当中取出消费者
- 从虚拟机模块当中取出消息
然后调用该消费者的消费处理回调函数,向客户端发送BasicConsumeResponse给客户端它需要的参数,告诉客户端“你可以消费了”
总体而言,并不难,因为我们早已把具体功能模块化,然后需要使用哪些功能,找对应的负责人【模块句柄】即可
这就是面向对象的模块化编程的独特魅力
2.定义Request和Response
Response的话,依然只需要BasicCommonResponse和BasicConsumeResponse即可
BasicCommonResponse是基础响应,对客户端的每条Request都有ACK
因此,我们只需要完成定义Request的任务即可
1.定义Request
首先,req_id和channel_id必然需要。下面我们根据消息拉取这一功能考虑所需参数
首先,消费者拉取消息,因此我们需要找到该消费者,所以需要vhost_name,queue_name和consumer_tag
有了这三个字段,我们就能从消费者管理模块拿取该消费者对应信息
然后我们就可以推送消息了,无需消费者再提供任何信息
2.proto文件
因此,我们往proto文件当中新增一个message消息体
//8. 消息的拉取
message BasicPullRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string queue_name = 4;string consumer_tag = 5;
}
然后编译
protoc --cpp_out=. mq_proto.proto
三、服务器实现消息拉取
1.业务模块的实现:信道模块
我们的信道是实现具体业务,提供具体服务模块,他内部整合了虚拟机模块和消费者管理模块。
因此我们只需要加一个函数,它的任务就是:
- 根据consumer_tag这三个字段 从消费者管理模块当中取出消费者
- 从虚拟机模块当中取出消息
- 调用该消费者的消费处理回调函数
- 如果该消费者有自动确认标志,则进行自动确认
其中,第3解耦且耗时,第4步我们想要它在第三步结束之后才开始,
我们想要解放信道服务线程,因此我们把第4步跟第3步放到一起交给异步工作线程
2.消费者管理模块实现O(1)获取消费者
我们的消费者管理模块还没有实现获取消费者这一接口,而这一接口在增加了消息拉取功能之后,又很常用
且基于之前的数据结构来实现,效率为O(N),所以我们需要改进一下,提高效率
1.目前的情形
std::vector<Consumer::ptr> _consumer_vec;
size_t _seq;
之前为了实现RR轮转的负载均衡,我们通过vector和一个轮询序号实现了队列消费者管理模块
新增,删除,都是O(N)【因为新增和删除消费者的需求频率并不高,所以没什么大碍】,主要是负载均衡select是O(1),所以设计总体来说还可以
但是目前我们想要增加的是根据consumer_tag来O(1)获取消费者,而vector只能O(N),所以不满足需求
2.加一个哈希表?
这里的哈希表的value_type不能是vector的迭代器,因为vector的扩容和删除都有迭代器失效问题
删除导致的迭代器失效问题还好解决,但是扩容导致的迭代器失效问题不好解决,因为vector的扩容被它通过封装屏蔽掉了,倒是也能检测【通过不断检测capcity()】,不过非常不优雅
所以哈希表的value_type搞成Consumer::ptr的话,虽然可以快速查找,但是这样的话,哈希表的用途就不大的
因为哈希表只能让查询操作变为O(1),但是删除还是O(N),因为vector还是要遍历删除的
3.如何做?
哈希表不能跟vector打好配合,所以我们将vector改为list,将RR轮转的负载均衡变为LRU式的负载均衡
每次select时从队头取最近最少使用的消费者,访问之后放到队尾
4.代码
_consumer_list的队头是最近最少使用的,队尾是最近访问的
因此:
- select获取消费者之后,将该消费者挪到队尾
- get获取消费者之后,将该消费者挪到队尾
- 新增消费者,将该消费者放到队头(因为该消费者的负载必为0)
class QueueConsumerManager
{
public:using ptr = std::shared_ptr<QueueConsumerManager>;QueueConsumerManager(const std::string &vhost_name, const std::string &qname): _vhost_name(vhost_name), _qname(qname) {}// 1. 新增消费者[只有想要订阅当前队列消息的消费者才会调用该函数]Consumer::ptr createConsumer(const std::string &consumer_tag, const ConsumerCallback &callback, bool auto_ack){// 1. 加锁,并查找是否有该消费者std::unique_lock<std::mutex> ulock(_mutex);auto iter_map = _consumer_map.find(consumer_tag);if (iter_map != _consumer_map.end()){iter_type iter_list = iter_map->second; // iter_list是链表的迭代器return *iter_list;}// 2. 从队头插入该消费者Consumer::ptr cp = std::make_shared<Consumer>(consumer_tag, callback, _vhost_name, _qname, auto_ack);_consumer_list.push_front(cp);_consumer_map.insert(std::make_pair(consumer_tag, _consumer_list.begin()));return cp;}void removeConsumer(const std::string &consumer_tag){// 加锁并删除该消费者std::unique_lock<std::mutex> ulock(_mutex);auto iter_map = _consumer_map.find(consumer_tag);if (iter_map == _consumer_map.end())return;iter_type iter_list = iter_map->second; // iter_list是链表的迭代器// 删除_consumer_list.erase(iter_list);_consumer_map.erase(iter_map);}Consumer::ptr selectConsumer(){// 0. 加锁并判断是否为空std::unique_lock<std::mutex> ulock(_mutex);if (_consumer_list.empty()){default_warning("获取消费者失败,因为该队列没有消费者,虚拟机名称:%s ,队列名:",_vhost_name.c_str(),_qname.c_str());return Consumer::ptr();}// 1. 拿到队头消费者Consumer::ptr cp = _consumer_list.front();// 2. 将队头消费者移到队尾_consumer_list.splice(_consumer_list.end(), _consumer_list, _consumer_list.begin());// 因为splice是转移节点,不会导致迭代器失效,所以无需更新哈希表return cp;}bool exist(const std::string &consumer_tag){std::unique_lock<std::mutex> ulock(_mutex);return _consumer_map.count(consumer_tag) > 0;}bool empty(){std::unique_lock<std::mutex> ulock(_mutex);return _consumer_map.empty();}void clear(){std::unique_lock<std::mutex> ulock(_mutex);_consumer_map.clear();}// 支持通过消费者tag来获取消费者,这里用哈希表来提高查询效率// 这里的哈希表的value_type不能是vector的迭代器,因为vector的扩容和删除都有迭代器失效问题// 删除导致的迭代器失效问题还好解决,但是扩容导致的迭代器失效问题不好解决,因为vector的扩容被它通过封装屏蔽掉了,倒是也能检测【通过不断检测capcity()】,不过非常不优雅// 所以哈希表的value_type搞成Consumer::ptr的话,虽然可以快速查找,但是这样的话,哈希表的用途就不大的// 因为哈希表只能让查询操作变为O(1),但是删除还是O(N),因为vector还是要遍历的// 那能否就单纯只有一个vector呢?不能,消费者删除需求并不高,所以曾经我们用的vector,但是有了消息拉取功能之后,获取消费者的需求就很高了// 所以不能只有一个vector,必须要有一个哈希表// 而哈希表不能跟vector打好配合,所以我们将vector改为list,将RR轮转的负载均衡变为LRU式的负载均衡// 每次select时从队头取最近最少使用的消费者,访问之后放到队尾Consumer::ptr getConsumer(const std::string &consumer_tag){std::unique_lock<std::mutex> ulock(_mutex);auto iter_map = _consumer_map.find(consumer_tag);if (iter_map != _consumer_map.end()){iter_type iter_list = iter_map->second; // iter_list是链表的迭代器// 将iter_list移动到队尾_consumer_list.splice(_consumer_list.end(), _consumer_list, iter_list);return *iter_list;}return Consumer::ptr();}private:using iter_type = std::list<Consumer::ptr>::iterator;std::string _vhost_name;std::string _qname;std::mutex _mutex;std::list<Consumer::ptr> _consumer_list;std::unordered_map<std::string, iter_type> _consumer_map;
};
然后在总体消费者管理模块当中添加:
Consumer::ptr getConsumer(const std::string vhost_name, const std::string &qname, const std::string &consumer_tag)
{std::ostringstream oss;oss << "获取消费者失败,因为未能找到该队列消费者管理模块,qname = " << qname << "\n";QueueConsumerManager::ptr qcmp = getQueueConsumerManager(vhost_name, qname, oss);if (qcmp.get() == nullptr){return Consumer::ptr();}return qcmp->getConsumer(consumer_tag);
}
3.信道模块实现
- 拿到消费者和消息
- 封装异步任务,抛入线程池
异步任务:- 调用该消费者的消费处理回调函数
- auto_ack的问题
- 返回基础响应
不要忘了:服务器所描述的消费者的消费处理回调函数的功能仅仅是:
向消费者发送基础消费处理响应BasicConsumeResponse
void basicPull(const BasicPullRequestPtr &req)
{// 1. 拿到该消费者Consumer::ptr cp = _consumer_manager_ptr->getConsumer(req->vhost_name(), req->queue_name(), req->consumer_tag());if(cp.get()==nullptr){default_error("拉取消息失败,因为消费者不存在,消费者tag:%s",req->consumer_tag().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 2. 拿到消息MessagePtr mp = _vhost_manager_ptr->basicConsume(req->vhost_name(), req->queue_name());if(mp.get()==nullptr){default_error("拉取消息失败,因为该队列没有待推送消息,队列名:%s",req->queue_name().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 3. 封装异步任务,抛入线程池auto func = [cp, mp, req, this](){// 3. 调用该消费者的消费处理回调函数cp->_callback(cp->_consumer_tag, mp->mutable_valid()->mutable_properities(), mp->valid().body());// 4. auto_ack的问题if (cp->_auto_ack){this->_vhost_manager_ptr->basicAck(req->vhost_name(), req->queue_name(), mp->valid().properities().msg_id());}};_pool_ptr->put(func);// 4. 基础相应basicResponse(req->req_id(),req->channel_id(),true);
}
4.broker服务器注册响应业务函数
_dispatcher.registerMessageCallback<BasicPullRequest>(std::bind(&Server::OnBasicPull,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
// 8. 消息拉取
void OnBasicPull(const muduo::net::TcpConnectionPtr &conn, const BasicPullRequestPtr &req, muduo::Timestamp)
{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("确认消息时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("确认消息失败,因为获取信道失败");return;}mychannel->basicPull(req);
}
四、客户端修改
客户端只需要修改Channel即可,
// 消息拉取
bool BasicPull()
{if (_consumer.get() == nullptr){default_error("消息拉取失败,该信道没有关联消费者");return false;}BasicPullRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(_consumer->_vhost_name);req.set_consumer_tag(_consumer->_consumer_tag);req.set_queue_name(_consumer->_queue_name);// 发送请求_codec->send(_conn, req);std::ostringstream oss;BasicCommonResponsePtr resp = waitResponse(rid);if (resp->ok()){default_info("消息拉取成功: %s",_consumer->_consumer_tag.c_str());}else{default_info("消息拉取失败: %s",_consumer->_consumer_tag.c_str());}return resp->ok();
}
五、修改消息推送逻辑–设计
1.考量
我们之前的消息推送逻辑是:当生产者发布消息之后,我们会立刻找消费者去推送该消息,如果没有消费者,那么就会丢弃该消息
这个逻辑在之前只有消息推送功能时,是正确的,因为不丢弃消息就是浪费资源
而现在我们实现了消息拉取功能,此时这种情况就可以不丢弃消息,而将其存放到队列当中,等待消费者进行拉取
但是推送的消息要不要跟拉取的消息分割一下呢,
让生产者发布消息时选择是推送的,还是拉取的,还是都有
因为一些消息具有实时性
,更希望快速被处理,就可以放到待推送
当中。而那些对实时性没这么高要求
的,就可以放到待确认
2.要不要给BOTH消息推送机制呢?
BOTH的意思是:这个消息既放到待推送链表当中,又放到待拉取链表当中
因为我们的消息推送是放到异步线程池当中去跑的,所以存在拉取快于推送的情况
哪个快听谁的,如果服务器先推送,那么这个消息就按推送走
如果服务器先拉取,那么这个消息就按拉取走
因为我们拉取之后,会将该消息从推送当中删除
推送之后,会将该消息从拉取当中删除,而且操作都因为加了互斥锁而成为了原子操作
所以不会存在同一消息被消费2次的情形
因为消息被消费只有3种情况:
- 推送(推送时会将消息从拉取当中删除,所以不怕该消息同时被拉取)
- 推送进行之前被拉取(此时推送就找不到消息了,没事)
- 推送失败之后被拉取(推送时会从拉取当中删除,推送失败之后会将消息放到拉取当中,所以没事)
因此BOTH是可以的,所以我们给上BOTH
3.设计
如何修改呢?
消息推送如果失败,也是将其放到待拉取消息链表当中,等待消费者主动拉取
持久化消息恢复之后,将其放到待拉取消息链表当中,等待消费者主动拉取
现在有个问题:
既然持久化的消息在恢复之后是直接放到待拉取消息链表当中的,那么有必要将消息的推送机制一并持久化吗?
没必要
六、实现
1.修改消息proto文件
// 3. 消息的推送机制:推送/拉取/推+拉
enum PublishMechanism
{UNKNOWNMECHANISM = 0;PUSH = 1;PULL = 2;BOTH = 3;
}// 4. 消息的基本属性
message BasicProperities
{string msg_id = 1;DeliveryMode mode = 2;string routing_key = 3;
}message Message
{message ValidLoad{string body = 1;BasicProperities properities = 2;string valid = 3;// 因为bool的true/false在protobuf当中持久化后的长度不同,因此我们不用bool,而是用"0"代表无效,"1"代表有效}ValidLoad valid = 1;uint64 offset = 2;uint64 len = 3; PublishMechanism mechanism = 4;
}
protoc --cpp_out=. mq_msg.proto
2.修改队列消息管理模块
1.成员的修改
内部类
struct iter_node
{using iter_type = std::list<MessagePtr>::iterator;iter_type push_iter;iter_type pull_iter;
};std::list<MessagePtr> _waitpush_list;
std::list<MessagePtr> _waitpull_list;
std::unordered_map<MessagePtr,iter_node> _waitpublish_map;
2.recovery恢复历史消息的修改
把别忘了gc恢复的待拉取消息链表存入_waitpublish_map当中!!
void recovery()
{std::unique_lock<std::mutex> ulock(_mutex);// 1. 恢复历史消息,将消息放入待拉取消息链表当中_waitpull_list = _mapper.gc();// 2. 遍历待拉取消息链表,将其中的消息都放入_waitpublish_map当中for (auto iter_list = _waitpull_list.begin(); iter_list != _waitpull_list.end(); ++iter_list){MessagePtr mp = *iter_list;_waitpublish_map[mp].pull_iter = iter_list;_waitpublish_map[mp].push_iter = _waitpush_list.end();}// 3. 将gc后的消息放到持久化哈希表中for (auto &mp : _waitpull_list){_durable_map[mp->valid().properities().msg_id()] = mp;}// 2. 更新持久化消息总数和有效消息总数_total_count = _valid_count = _durable_map.size();
}
3.发布消息的修改
bool publishMessage(const BasicProperities *bp, const std::string &body, DeliveryMode mode, PublishMechanism mechanism)
{// 消息能否持久化取决于队列是否持久化,因此消息的持久化与否不仅要看bp当中的mode,还要看DeliveryMode mode// 只有当DeliveryMode mode是持久化时,才看bp当中的mode,否则一律不持久化// 1. 构建消息智能指针MessagePtr mp = std::make_shared<Message>();mp->mutable_valid()->set_body(body);mp->mutable_valid()->set_valid("1");mp->mutable_valid()->mutable_properities()->set_msg_id(bp->msg_id());mp->mutable_valid()->mutable_properities()->set_routing_key(bp->routing_key());DeliveryMode final_mode = (mode == DURABLE && bp->mode() == DURABLE) ? DURABLE : UNDURABLE;mp->mutable_valid()->mutable_properities()->set_mode(final_mode);mp->set_mechanism(mechanism);// 加锁std::unique_lock<std::mutex> ulock(_mutex);// 2. 看是否需要持久化if (final_mode == DURABLE){if (!_mapper.insert(mp)){default_error("发布消息失败, 因为消息持久化失败, 消息ID: %s",bp->msg_id().c_str());return false;}// 放到持久化哈希表中_durable_map[bp->msg_id()] = mp;_total_count++;_valid_count++;}// 3. 根据消息发布机制来进行消息存储// 1. 放到待推送链表if (mechanism == PUSH){_waitpush_list.push_back(mp);_waitpublish_map[mp].push_iter = std::prev(_waitpush_list.end());_waitpublish_map[mp].pull_iter = _waitpull_list.end();}else if (mechanism == PULL){_waitpull_list.push_back(mp);_waitpublish_map[mp].push_iter = _waitpush_list.end();_waitpublish_map[mp].pull_iter = std::prev(_waitpull_list.end());}else if (mechanism == BOTH){_waitpush_list.push_back(mp);_waitpull_list.push_back(mp);_waitpublish_map[mp].push_iter = std::prev(_waitpush_list.end());_waitpublish_map[mp].pull_iter = std::prev(_waitpull_list.end());}else{default_error("发布消息失败,因为消息的发布机制未知, 消息ID: %s",bp->msg_id().c_str());return false;}return true;
}
3.front的修改
抽离出一个共用函数:
MessagePtr front(std::list<MessagePtr> &main_list, std::list<MessagePtr> &sub_list, bool ispush)
{std::unique_lock<std::mutex> ulock(_mutex);// 0.加锁并判空if (main_list.empty()){return MessagePtr();}// 1.从链表取消息,设置待确认状态MessagePtr mp = main_list.front();main_list.pop_front();_waitack_map[mp->valid().properities().msg_id()] = mp;// 2.在另一个链表当中进行删除auto iter_hash = _waitpublish_map.find(mp);// 假设它是push,那么另一个链表就是pulliter_node::iter_type iter_list = iter_hash->second.pull_iter;if (!ispush){iter_list = iter_hash->second.push_iter;}if (iter_list != sub_list.end()){sub_list.erase(iter_list);}// 3.在哈希表当中删除_waitpublish_map.erase(iter_hash);return mp;
}
从待推送链表取消息:
MessagePtr push_list_front()
{return front(_waitpush_list,_waitpull_list,true);
}
从待拉取链表取消息:
MessagePtr pull_list_front()
{return front(_waitpull_list,_waitpush_list,false);
}
4.clear的修改
// 需要提供销毁该队列所有信息的方法(删除队列时要用)
void clear()
{std::unique_lock<std::mutex> ulock(_mutex);_mapper.removeFile();_waitpush_list.clear();_waitpull_list.clear();_waitpublish_map.clear();_waitack_map.clear();_durable_map.clear();_valid_count = _total_count = 0;
}
3.总体消息管理模块修改
1.PublishMessage加一个参数
bool publishMessage(const std::string &qname, const BasicProperities *bp, const std::string &body, DeliveryMode mode, PublishMechanism mechanism)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("发布消息失败,因为该队列的消息管理模块句柄尚未初始化");return false;}qmmp = iter->second;}return qmmp->publishMessage(bp, body, mode, mechanism);
}
2.获取链表队头消息修改
MessagePtr push_list_front(const std::string &qname)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("获取待推送消息失败,因为该队列的消息管理模块句柄尚未初始化");return MessagePtr();}qmmp = iter->second;}return qmmp->push_list_front();
}MessagePtr pull_list_front(const std::string &qname)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("获取待拉取消息失败,因为该队列的消息管理模块句柄尚未初始化");return MessagePtr();}qmmp = iter->second;}return qmmp->pull_list_front();
}
至此,消息模块就搞完了,下面顺着这个层状结构往上找,去修改虚拟机
4.虚拟机模块的修改
1.发布消息增加一个参数
1.虚拟机模块
bool basicPublish(const std::string &qname, const BasicProperities *bp, const std::string &body,PublishMechanism mechanism)
{// 在这里能够知道队列的持久化方式,因此就能够传递durable了// 1. 查找该队列的ptr,看是否存在,拿到durable// 2. 发布消息MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("发布消息失败,因为该队列不存在, 队列名: %s",qname.c_str());return false;}return _mmp->publishMessage(qname, bp, body, (mqp->durable) ? DURABLE : UNDURABLE,mechanism);
}
2.虚拟机管理模块
bool basicPublish(const std::string &vname, const std::string &qname, const BasicProperities *bp, const std::string &body, PublishMechanism mechanism)
{std::ostringstream oss;oss << "发布消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->basicPublish(qname, bp, body, mechanism);
}
2.推送和拉取消息的修改
1.虚拟机模块
// 推送[消费]消息
MessagePtr basicConsume(const std::string &qname)
{return _mmp->push_list_front(qname);
}// 拉取消息
MessagePtr basicPull(const std::string &qname)
{return _mmp->pull_list_front(qname);
}
2.虚拟机管理模块
// 推送[消费]消息
MessagePtr basicConsume(const std::string &vname, const std::string &qname)
{std::ostringstream oss;oss << "推送消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MessagePtr();}return vhp->basicConsume(qname);
}// 拉取消息
MessagePtr basicPull(const std::string &vname, const std::string &qname)
{std::ostringstream oss;oss << "拉取消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MessagePtr();}return vhp->basicPull(qname);
}
然后虚拟机模块就搞定了,再往上走,修改信道
5.proto网络通信协议修改
因为信道是网络服务模块,且生产者要提供的参数多了一个,所以我们需要修改一下proto网络通信协议
1.BasicPublishRequest
其实就是给BasicPublishRequest加一个PublishMechanism 字段而已
//7. 消息的发布与确认
message BasicPublishRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string exchange_name = 4; //需要用户指定:消息发布到哪个交换机上,然后我们给他进行路由匹配,放到对应队列当中string body = 5;BasicProperities properities = 6;PublishMechanism mechanism = 7;// 消息的发布方式
}
protoc --cpp_out=. mq_proto.proto
6.信道模块修改
1.发布消息的修改
- 给basicPublish多传一个参数req->mechanism()
- 只有当该消息是PUSH的发布机制时,才需要将publishCallback封装为异步任务抛入线程池
void basicPublish(const BasicPublishRequestPtr &req)
{// 1. 先找到该交换机的交换机类型Exchange::ptr ep = _vhost_manager_ptr->getExchange(req->vhost_name(), req->exchange_name());if (ep.get() == nullptr){default_error("发布消息失败,因为交换机不存在\n,交换机名称:%s",req->exchange_name().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 2. 先找到消息发布的交换机 绑定的所有队列MsgQueueBindingMap qmap = _vhost_manager_ptr->getAllBindingsByExchange(req->vhost_name(), req->exchange_name());// 3. 遍历所有队列,进行路由匹配与消息投递for (auto &kv : qmap){Binding::ptr bp = kv.second;BasicProperities *properities = nullptr;::std::string routing_key;if (req->has_properities()){properities = req->mutable_properities();routing_key = properities->routing_key();}if (Router::route(routing_key, bp->binding_key, ep->type)){// 把消息投递到指定队列_vhost_manager_ptr->basicPublish(req->vhost_name(), bp->queue_name, properities, req->body(), req->mechanism());// 判断该消息是否需要推送if (req->mechanism() == PUSH || req->mechanism() == BOTH){// 5. 向线程池添加一个消息消费任务,消费任务交给线程池中的线程去做,解放Channel线程去做更重要的任务auto func = ::std::bind(&Channel::publishCallback, this, req->vhost_name(), bp->queue_name);_pool_ptr->put(func);}}}// 返回响应即可basicResponse(req->req_id(), req->channel_id(), true);
}
2.publishCallback的修改
注意注意:
- 当publishCallback推送消息找不到消费者时,要将该消息放到待拉取消息链表当中
- 放过去的时候,将消息推送机制改为PULL
1.坑点–连锁BUG
这里有一个连锁BUG问题:
我们复用basicPublish的时候,会复用到消息管理模块当中的新增消息
如果我们的消息是持久化的,那么就会重复持久化,持久化时又会修改消息结构体当中的offset字段,因此ACK的时候就只能删除持久化消息副本,而无法删除原件
此时就BUG了:那么我们想当然地就会这么想:
那我把DeliveryMode改成UNDURABLE不就行了?
mp->mutable_valid()->mutable_properities()->set_mode(UNDURABLE);
不行的,因为我们ACK的时候,是否需要删除持久化消息是看该消息的DeliveryMode
因此这样的话,ACK时就无法删除该消息了,也是BUG
怎么办呢?
2.解决方案
-
从需求解决问题:
我们想要的其实就是把一个MessagePtr放到待拉取消息链表当中,因此让消息管理模块提供这么一个接口不就行了吗 -
从复用方面解决问题:
我们依然选择持久化,但是在调用basicPublish之前先调用一下basicAck,将原件先ACK了 -
从ACK方面方面解决问题:
修改ACK:是否删除持久化消息不依据DeliveryMode,而是依赖于该消息是否在持久化哈希表当中
下面我们进行选择:
- 效率:第2种由于需要复用ACK,也就需要进行IO操作,效率低,所以淘汰第2种
- 业务/代码优雅角度:第3种必须依赖于持久化哈希表,而不能依赖于DeliveryMode,不好,而且修改DeliveryMode不利于排查BUG时的调试,不好
因此,我们选择第1种
3.实现接口
QueueMessageManager:
// 提供向待拉取消息链表当中插入数据
void insert_pull(const MessagePtr& mp)
{std::unique_lock<std::mutex> ulock(_mutex);_waitpull_list.push_back(mp);_waitpublish_map[mp].pull_iter=std::prev(_waitpull_list.end());_waitpublish_map[mp].push_iter=_waitpush_list.end();
}
MessageManager:
// 提供向待拉取消息链表当中插入数据
bool insert_pull(const std::string &qname, const MessagePtr &mp)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("发布消息失败,因为该队列的消息管理模块句柄尚未初始化");return false;}qmmp = iter->second;}qmmp->insert_pull(mp);return true;
}
VirtualHost:
// 将消息放入待拉取消息链表当中
bool insert_pull(const std::string& qname,const MessagePtr& mp)
{return _mmp->insert_pull(qname,mp);
}
VirtualHostManager:
// 将消息放入待拉取消息链表当中
bool insert_pull(const std::string &vname, const std::string &qname, const MessagePtr &mp)
{std::ostringstream oss;oss << "将消息放入待拉取消息链表失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->insert_pull(qname, mp);
}
4.publishCallback的修改
// 推送消息(取出消息,取出消费者,调用对应消费者的消费处理回调函数)
// 不能先取消费者,因为那样会导致 在无消费者的情况下,待推送消息在链表当中堆积的情况
// 而通过先取消息,再取消费者,将消息放到待拉取消息链表当中,等待有消费者拉取
void publishCallback(const ::std::string &vname, const ::std::string &qname)
{// 1.取出消息MessagePtr mp = _vhost_manager_ptr->basicConsume(vname, qname);if (mp.get() == nullptr){default_info("消息的消费失败, 因为消息队列为空,没有消息: %s",qname.c_str());return;}// 2.取出消费者Consumer::ptr cp = _consumer_manager_ptr->selectConsumer(vname, qname);if (cp.get() == nullptr){default_info("该队列中暂无消费者,将该消息放入该队列的待拉取消息链表当中 %s",qname.c_str());if (mp->mechanism() == PUSH || mp->mechanism() == BOTH){// 这里要将该消息重新添加到待拉取消息链表当中_vhost_manager_ptr->insert_pull(vname, qname, mp);}return;}// 3.调用消费者的消费处理回调函数cp->_callback(cp->_consumer_tag, mp->mutable_valid()->mutable_properities(), mp->valid().body());default_info("调用消费者的消费处理回调函数成功 %s",qname.c_str());// 4.如果消费者有自动确认标志,则进行自动确认if (cp->_auto_ack == true){_vhost_manager_ptr->basicAck(vname, qname, mp->valid().properities().msg_id());}
}
3.拉取消息的修改
void basicPull(const BasicPullRequestPtr &req)
{// 1. 拿到该消费者Consumer::ptr cp = _consumer_manager_ptr->getConsumer(req->vhost_name(), req->queue_name(), req->consumer_tag());if(cp.get()==nullptr){default_error("拉取消息失败,因为消费者不存在,消费者tag:%s",req->consumer_tag().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 2. 拿到消息MessagePtr mp = _vhost_manager_ptr->basicPull(req->vhost_name(), req->queue_name());if(mp.get()==nullptr){default_error("拉取消息失败,因为该队列没有待推送消息,队列名:%s",req->queue_name().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 3. 封装异步任务,抛入线程池auto func = [cp, mp, req, this](){// 3. 调用该消费者的消费处理回调函数cp->_callback(cp->_consumer_tag, mp->mutable_valid()->mutable_properities(), mp->valid().body());// 4. auto_ack的问题if (cp->_auto_ack){this->_vhost_manager_ptr->basicAck(req->vhost_name(), req->queue_name(), mp->valid().properities().msg_id());}};_pool_ptr->put(func);// 4. 基础相应basicResponse(req->req_id(),req->channel_id(),true);
}
7.客户端修改
就是给Channel多加一个参数而已
bool BasicPublish(const std::string &vhost_name, const std::string &exchange_name, const BasicProperities *bp, const std::string &body,PublishMechanism mechanism)
{BasicPublishRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_body(body);req.set_mechanism(mechanism);if (bp != nullptr){req.mutable_properities()->set_msg_id(bp->msg_id());req.mutable_properities()->set_mode(bp->mode());req.mutable_properities()->set_routing_key(bp->routing_key());}// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);if (resp->ok()){default_info("发布消息成功 %s",body.c_str());}else{default_info("发布消息失败 %s",body.c_str());}return resp->ok();
}
七、验证
1.消息拉取功能与恢复功能联合测试
我们的验证方式是:
- 先让生产者跑,然后再让消费者跑,消费者能够拉取消息,则成功
- 让生产者跑,制造持久化未确认消息,然后服务器重启(恢复历史消息),然后消费者跑,消费者能够拉取消息,则成功
1.测试1
1.生产者
消息的发布机制就给PUSH了
#include "connection.hpp"
using namespace ns_mq;
#include <thread>
#include <vector>
using namespace std;// host1
void publisher1(const Connection::ptr &conn, const std::string &thread_name)
{// 1. 创建信道Channel::ptr cp = conn->getChannel();// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("host1", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host1", "queue1", true, false, false, {});cp->declareMsgQueue("host1", "queue2", true, false, false, {});cp->bind("host1", "exchange1", "queue1", "news.sport.#");cp->bind("host1", "exchange1", "queue2", "news.*.zhangsan");// 3. 发送10条消息BasicProperities bp;bp.set_mode(DURABLE);bp.set_routing_key("news.sport.basketball");for (int i = 0; i < 10; i++){bp.set_msg_id(UUIDHelper::uuid());cp->BasicPublish("host1", "exchange1", &bp, "Hello -" + std::to_string(i), PUSH);}// 4. 关闭信道conn->returnChannel(cp);
}// host2
void publisher2(const Connection::ptr &conn, const std::string &thread_name)
{// 1. 创建信道Channel::ptr cp = conn->getChannel();// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host2", "./host2/resource.db", "./host2/message");cp->declareExchange("host2", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host2", "queue1", true, false, false, {});cp->declareMsgQueue("host2", "queue2", true, false, false, {});cp->bind("host2", "exchange1", "queue1", "news.sport.#");cp->bind("host2", "exchange1", "queue2", "news.*.zhangsan");// 3. 发送10条消息BasicProperities bp;bp.set_mode(DURABLE);bp.set_routing_key("news.sport.basketball");for (int i = 0; i < 10; i++){bp.set_msg_id(UUIDHelper::uuid());cp->BasicPublish("host2", "exchange1", &bp, "Hello -" + std::to_string(i), PUSH);}// 4. 关闭信道conn->returnChannel(cp);
}int main()
{AsyncWorker::ptr worker = std::make_shared<AsyncWorker>();Connection::ptr myconn = std::make_shared<Connection>("127.0.0.1", 8888, worker);vector<thread> thread_v;thread_v.push_back(thread(publisher1, myconn, "thread1"));thread_v.push_back(thread(publisher2, myconn, "thread2"));for (auto &t : thread_v)t.join();return 0;
}
2.消费者
订阅完队列之后,每隔1s拉取一次消息
#include "connection.hpp"
using namespace ns_mq;
#include <thread>
#include <vector>
#include <thread>
using namespace std;// 因为要拿到信道才能进行确认,所以这里需要把Channel::ptr bind过来
void Callback(const Channel::ptr &cp, const std::string &consumer_tag, const BasicProperities *bp, const std::string &body)
{// 1. 消费消息std::string id;if (bp != nullptr){id = bp->msg_id();}std::cout << consumer_tag << " 消费了消息: " << body << ", 消息ID: " << id << "\n";// 2. 确认消息if (bp != nullptr)std::cout << cp->BasicAck(id) << "\n";
}void consumer1(const Connection::ptr &conn, const std::string &thread_name)
{Channel::ptr cp = conn->getChannel();default_debug("consumer1: 信道ID:",cp->cid().c_str());// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("host1", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host1", "queue1", true, false, false, {});cp->declareMsgQueue("host1", "queue2", true, false, false, {});cp->bind("host1", "exchange1", "queue1", "news.sport.#");cp->bind("host1", "exchange1", "queue2", "news.*.zhangsan");// 3. 创建消费者cp->BasicConsume("host1", "consumer1", "queue1",std::bind(Callback, cp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), false);// 4. 等待消息while (true){cp->BasicPull();std::this_thread::sleep_for(std::chrono::seconds(1));}// 5. 关闭信道conn->returnChannel(cp);
}void consumer2(const Connection::ptr &conn, const std::string &thread_name)
{Channel::ptr cp = conn->getChannel();default_debug("consumer2: 信道ID:",cp->cid().c_str());// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host2", "./host2/resource.db", "./host2/message");cp->declareExchange("host2", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host2", "queue1", true, false, false, {});cp->declareMsgQueue("host2", "queue2", true, false, false, {});cp->bind("host2", "exchange1", "queue1", "news.sport.#");cp->bind("host2", "exchange1", "queue2", "news.*.zhangsan");// 3. 创建消费者cp->BasicConsume("host2", "consumer2", "queue1",std::bind(Callback, cp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), false);// 4. 等待消息while (true){cp->BasicPull();std::this_thread::sleep_for(std::chrono::seconds(1));}// 5. 关闭信道conn->returnChannel(cp);
}int main()
{AsyncWorker::ptr worker = std::make_shared<AsyncWorker>();// 1. 创建连接和信道Connection::ptr conn = std::make_shared<Connection>("127.0.0.1", 8888, worker);vector<thread> thread_v;thread_v.push_back(thread(consumer1, conn, "thread1"));thread_v.push_back(thread(consumer2, conn, "thread2"));for (auto &t : thread_v)t.join();return 0;
}
3.演示
先让生产者跑,然后再让消费者跑,消费者能够拉取消息,则成功
2.测试2 – 演示
让生产者跑,制造持久化未确认消息,然后服务器重启(恢复历史消息),然后消费者跑,消费者能够拉取消息,则成功
2.PULL测试
我们的验证方式是:客户端纯拉取,所有消息必须是由拉取进行消费的
因为客户端拉取消息是每1s拉取一次,所以拉取消息会持续10s,如果是推送的话,那么一瞬间就会搞定
先让消费者跑,再让生产者跑
只需要把生产者发布消息时的发布机制改一下即可
cp->BasicPublish("host2", "exchange1", &bp, "Hello -" + std::to_string(i), PULL);
演示:
2.BOTH测试
我们在publishCallback当中故意让工作线程等上5s,这样就能让拉取快于推送了
因此:
void publishCallback(const ::std::string &vname, const ::std::string &qname)
{std::this_thread::sleep_for(std::chrono::seconds(5));
//模拟5s后异步线程才开始执行该函数,测试BOTH时使用,用来让拉取快于推送
为了保证生产者主线程退出之前异步工作线程能够执行完这些publishCallback
因此我们让生产者结束之后陷入死循环
// 4. 测试BOTH时:等待异步线程执行完publishCallback
while (true)
{std::this_thread::sleep_for(std::chrono::seconds(1000));
}// 5. 关闭信道
conn->returnChannel(cp);
演示:
验证成功
本篇博客分了两大点来进行扩展,是为了让大家更有一步步的代入感,不至于一上来就这么突兀
所以代码篇幅较大,希望大家理解
动图比较卡顿是因为帧数太少,因为CSDN不支持上传5MB以上的图片,所以只能减帧压缩体积,抱歉
以上就是项目扩展二:消息拉取功能的实现的全部内容
相关文章:

项目扩展二:消息拉取功能的实现
项目扩展二:消息拉取功能的实现 一、回顾一下消息推送功能是如何实现的二、设计消息拉取功能1.服务器如何处理2.定义Request和Response1.定义Request2.proto文件 三、服务器实现消息拉取1.业务模块的实现:信道模块2.消费者管理模块实现O(1)获取消费者1.目…...

C语言6大常用标准库 -- 4.<math.h>
目录 引言 4. C标准库--math.h 4.1 简介 4.2 库变量 4.3 库宏 4.4 库函数 4.5 常用的数学常量 🌈你好呀!我是 程序猿 🌌 2024感谢你的陪伴与支持 ~ 🚀 欢迎一起踏上探险之旅,挖掘无限可能,共同成长&…...

【图像匹配】基于SIFT算法的图像匹配,matlab实现
博主简介:matlab图像代码项目合作(扣扣:3249726188) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 本次案例是基于基于SIFT算法的图像匹配,用matlab实现。 一、案例背景和算法介绍 本…...

C++门迷宫
目录 开头程序程序的流程图程序游玩的效果下一篇博客要说的东西 开头 大家好,我叫这是我58。 程序 #include <iostream> using namespace std; void printmaze(const char strmaze[11][11]) {int i 0;int ia 0;for (; i < 11; i) {for (ia 0; ia <…...
用最通俗易懂的语言和例子讲解三维点云
前言: 我整体的学习顺序是看的按B站那“唯一”的三维点云的视频学习的(翻了好久几乎没有第二个...)对于深度学习部分,由于本人并没有进行学习,所以没有深究。大多数内容都进行了自己的理解并找了很多网络的资源方便理解…...

VM虚拟机下载以及激活
传统的官网已经找不到下载了,这里我将下载好的放在阿里云盘,百度云盘太慢了,懂得都得 阿里云盘分享 下载好了后会是一个exe文件,直接双击运行就可 下载无脑下一步即可,这里不做介绍 下载好了后,需要密钥这里…...
详解Ajax与axios的区别
Ajax与Axios在Web开发中都是用于发送HTTP请求的技术,但它们在多个方面存在显著的差异。以下是对两者区别的详细解析: 1. 技术原理 Ajax:Asynchronous JavaScript and XML(异步JavaScript和XML)的缩写,是一…...
golang学习笔记28——golang中实现多态与面向对象
推荐学习文档 golang应用级os框架,欢迎stargolang应用级os框架使用案例,欢迎star案例:基于golang开发的一款超有个性的旅游计划app经历golang实战大纲golang优秀开发常用开源库汇总想学习更多golang知识,这里有免费的golang学习笔…...

运行 xxxxApplication 时出错。命令行过长。 通过 JAR 清单或通过类路径文件缩短命令行,然后重新运行。
一、问题描述 运行 xxxxApplication 时出错。命令行过长。 通过 JAR 清单或通过类路径文件缩短命令行,然后重新运行。 二、问题分析 在idea中,运行一个springboot项目,在使用大量的库和依赖的时候,会出现报错“命令行过长”&…...
k8s自动清理pod脚本分享
检查会遇到集群节点内存消耗超过90%,我们可以筛选一些可以进行重启的pods,如脚本中涉及svc-开头的,进行触发即重启的shell编写。此项会涉及metrics组件需要安装。 #!/bin/bash# 设置内存使用率阈值为90% MEMORY_THRESHOLD90# 初始化一个数组…...
Go并发编程的高级技巧——请求复制与限流
解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 在一些高性能应用场景中,快速响应是非常重要的目标。例如,当一个应用需要快速响应用户的HTTP请求,或从多个副本中检索数据时,如何优化请求处理成为关键。本文将讨论如何在Go语言中,通过并发和限流机制来实现…...

网站建设模板选择哪种
在选择网站建设模板时,需要考虑多个因素,包括网站的目的、受众、内容类型以及个性化需求等。以下是一些常见的网站建设模板类型,以及它们的特点,希望对你的选择有所帮助。 企业/商务模板: 企业和商务网站通常需要专业、…...
【linux】kill命令
kill 命令在 Linux 和类 Unix 系统中用于向进程发送信号,默认情况下是发送 SIGTERM(信号 15),请求程序终止运行。如果程序没有响应 SIGTERM 信号,可以使用 SIGKILL(信号 9)强制终止进程…...

Python基础 | 在虚拟环境中安装并在指定文件夹中打开Jupyter notebook
在虚拟环境中安装并在指定文件夹中打开Jupyter notebook 前言一、在虚拟环境下安装Jupyter notebook二、在指定路径下打开Jupyter notebook 前言 Jupyter Notebook 是一个基于 Web 的交互式计算环境,主要功能是将代码、文本、数学方程式、可视化和其他相关元素组合…...

1.Spring-容器-注册
一、Bean和获取Bean (1)创建IoC容器: SpringApplication.run(类名.class, args); ConfigurableApplicationContext ioc SpringApplication.run(Spring01IocApplication.class, args); (2)将对象注册到IoC容器中&am…...
Mapper.xml SQL大于小于号转义符
Mapper.xml中写的SQL语句,大于小于号字符直接写会报错,需要变成转义字符 对应如下: Mapper.xml SQL大于小于号转义符...
Linux:进程(三)——进程状态
目录 Linux源代码对进程的描述 R S D T t X Z(进程僵尸) 孤儿进程 Linux源代码对进程的描述 理论上把进程状态大致被分为了:运行、阻塞、挂起。那么,在操作系统中具体是如何描述状态的。(有时候Linux内核也把…...

Effective Java 学习笔记 如何为方法编写文档
目录 方法的文档注解设计的原则 Javadoc常用的文档注释 一些注意细节 通过Javadoc命令生成h5页面 这是第8章Java方法的最后一部分,聚焦为导出的API编写文档注释。 如果要想使得API真正可用,配套的文档是必须的。Java提供了Javadoc这个文档生成工具&…...

TCP四大拥塞控制算法总结
四大算法:1.慢启动,2.拥塞避免,3.拥塞发生,4.快速恢复。 慢启动: 首先连接建好的开始先初始化拥塞窗口cwnd大小为1,表明可以传一个MSS大小的数据。 每当收到一个ACK,cwnd大小加一,…...
深入解析ElasticSearch从基础概念到性能优化指南
一.引言 ElasticSearch是一个分布式的搜索和分析引擎,专为处理大规模的结构化和非结构化数据而设计。它建立在Apache Lucene之上,提供了强大的全文搜索能力、高可用性和实时分析的功能。无论是作为日志分析平台,还是作为数据驱动的应用程序的…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...

【笔记】WSL 中 Rust 安装与测试完整记录
#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统:Ubuntu 24.04 LTS (WSL2)架构:x86_64 (GNU/Linux)Rust 版本:rustc 1.87.0 (2025-05-09)Cargo 版本:cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...

Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制
目录 节点的功能承载层(GATT/Adv)局限性: 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能,如 Configuration …...
Python 高效图像帧提取与视频编码:实战指南
Python 高效图像帧提取与视频编码:实战指南 在音视频处理领域,图像帧提取与视频编码是基础但极具挑战性的任务。Python 结合强大的第三方库(如 OpenCV、FFmpeg、PyAV),可以高效处理视频流,实现快速帧提取、压缩编码等关键功能。本文将深入介绍如何优化这些流程,提高处理…...
基于鸿蒙(HarmonyOS5)的打车小程序
1. 开发环境准备 安装DevEco Studio (鸿蒙官方IDE)配置HarmonyOS SDK申请开发者账号和必要的API密钥 2. 项目结构设计 ├── entry │ ├── src │ │ ├── main │ │ │ ├── ets │ │ │ │ ├── pages │ │ │ │ │ ├── H…...

从物理机到云原生:全面解析计算虚拟化技术的演进与应用
前言:我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM(Java Virtual Machine)让"一次编写,到处运行"成为可能。这个软件层面的虚拟化让我着迷,但直到后来接触VMware和Doc…...
用递归算法解锁「子集」问题 —— LeetCode 78题解析
文章目录 一、题目介绍二、递归思路详解:从决策树开始理解三、解法一:二叉决策树 DFS四、解法二:组合式回溯写法(推荐)五、解法对比 递归算法是编程中一种非常强大且常见的思想,它能够优雅地解决很多复杂的…...

算法—栈系列
一:删除字符串中的所有相邻重复项 class Solution { public:string removeDuplicates(string s) {stack<char> st;for(int i 0; i < s.size(); i){char target s[i];if(!st.empty() && target st.top())st.pop();elsest.push(s[i]);}string ret…...