项目扩展二:消息拉取功能的实现
项目扩展二:消息拉取功能的实现
- 一、回顾一下消息推送功能是如何实现的
- 二、设计消息拉取功能
- 1.服务器如何处理
- 2.定义Request和Response
- 1.定义Request
- 2.proto文件
- 三、服务器实现消息拉取
- 1.业务模块的实现:信道模块
- 2.消费者管理模块实现O(1)获取消费者
- 1.目前的情形
- 2.加一个哈希表?
- 3.如何做?
- 4.代码
- 3.信道模块实现
- 4.broker服务器注册响应业务函数
- 四、客户端修改
- 五、修改消息推送逻辑--设计
- 1.考量
- 2.要不要给BOTH消息推送机制呢?
- 3.设计
- 六、实现
- 1.修改消息proto文件
- 2.修改队列消息管理模块
- 1.成员的修改
- 2.recovery恢复历史消息的修改
- 3.发布消息的修改
- 3.front的修改
- 4.clear的修改
- 3.总体消息管理模块修改
- 1.PublishMessage加一个参数
- 2.获取链表队头消息修改
- 4.虚拟机模块的修改
- 1.发布消息增加一个参数
- 1.虚拟机模块
- 2.虚拟机管理模块
- 2.推送和拉取消息的修改
- 1.虚拟机模块
- 2.虚拟机管理模块
- 5.proto网络通信协议修改
- 1.BasicPublishRequest
- 6.信道模块修改
- 1.发布消息的修改
- 2.publishCallback的修改
- 1.坑点--连锁BUG
- 2.解决方案
- 3.实现接口
- 4.publishCallback的修改
- 3.拉取消息的修改
- 7.客户端修改
- 七、验证
- 1.消息拉取功能与恢复功能联合测试
- 1.测试1
- 1.生产者
- 2.消费者
- 3.演示
- 2.测试2 -- 演示
- 2.PULL测试
- 2.BOTH测试
一、回顾一下消息推送功能是如何实现的
这其中一共有三个角色:
二、设计消息拉取功能
给客户端多提供一个服务:消息拉取服务
其实就是从该消费者订阅的队列当中取出一个消息,推送给该消费者
只不过这个消息拉取是由消费者主动向我们服务器发起请求的
因此,我们要考虑两点:
- 服务器如何主动向消费者发送消息
- 网络通信协议中Request和Response的定义
1.服务器如何处理
要完成:主动向消费者发送消息这一任务,需要两个模块:消费者管理模块和虚拟机模块
- 从消费者管理模块当中取出消费者
- 从虚拟机模块当中取出消息
然后调用该消费者的消费处理回调函数,向客户端发送BasicConsumeResponse给客户端它需要的参数,告诉客户端“你可以消费了”
总体而言,并不难,因为我们早已把具体功能模块化,然后需要使用哪些功能,找对应的负责人【模块句柄】即可
这就是面向对象的模块化编程的独特魅力
2.定义Request和Response
Response的话,依然只需要BasicCommonResponse和BasicConsumeResponse即可
BasicCommonResponse是基础响应,对客户端的每条Request都有ACK
因此,我们只需要完成定义Request的任务即可
1.定义Request
首先,req_id和channel_id必然需要。下面我们根据消息拉取这一功能考虑所需参数
首先,消费者拉取消息,因此我们需要找到该消费者,所以需要vhost_name,queue_name和consumer_tag
有了这三个字段,我们就能从消费者管理模块拿取该消费者对应信息
然后我们就可以推送消息了,无需消费者再提供任何信息
2.proto文件
因此,我们往proto文件当中新增一个message消息体
//8. 消息的拉取
message BasicPullRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string queue_name = 4;string consumer_tag = 5;
}
然后编译
protoc --cpp_out=. mq_proto.proto
三、服务器实现消息拉取
1.业务模块的实现:信道模块
我们的信道是实现具体业务,提供具体服务模块,他内部整合了虚拟机模块和消费者管理模块。
因此我们只需要加一个函数,它的任务就是:
- 根据consumer_tag这三个字段 从消费者管理模块当中取出消费者
- 从虚拟机模块当中取出消息
- 调用该消费者的消费处理回调函数
- 如果该消费者有自动确认标志,则进行自动确认
其中,第3解耦且耗时,第4步我们想要它在第三步结束之后才开始,
我们想要解放信道服务线程,因此我们把第4步跟第3步放到一起交给异步工作线程
2.消费者管理模块实现O(1)获取消费者
我们的消费者管理模块还没有实现获取消费者这一接口,而这一接口在增加了消息拉取功能之后,又很常用
且基于之前的数据结构来实现,效率为O(N),所以我们需要改进一下,提高效率
1.目前的情形
std::vector<Consumer::ptr> _consumer_vec;
size_t _seq;
之前为了实现RR轮转的负载均衡,我们通过vector和一个轮询序号实现了队列消费者管理模块
新增,删除,都是O(N)【因为新增和删除消费者的需求频率并不高,所以没什么大碍】,主要是负载均衡select是O(1),所以设计总体来说还可以
但是目前我们想要增加的是根据consumer_tag来O(1)获取消费者,而vector只能O(N),所以不满足需求
2.加一个哈希表?
这里的哈希表的value_type不能是vector的迭代器,因为vector的扩容和删除都有迭代器失效问题
删除导致的迭代器失效问题还好解决,但是扩容导致的迭代器失效问题不好解决,因为vector的扩容被它通过封装屏蔽掉了,倒是也能检测【通过不断检测capcity()】,不过非常不优雅
所以哈希表的value_type搞成Consumer::ptr的话,虽然可以快速查找,但是这样的话,哈希表的用途就不大的
因为哈希表只能让查询操作变为O(1),但是删除还是O(N),因为vector还是要遍历删除的
3.如何做?
哈希表不能跟vector打好配合,所以我们将vector改为list,将RR轮转的负载均衡变为LRU式的负载均衡
每次select时从队头取最近最少使用的消费者,访问之后放到队尾
4.代码
_consumer_list的队头是最近最少使用的,队尾是最近访问的
因此:
- select获取消费者之后,将该消费者挪到队尾
- get获取消费者之后,将该消费者挪到队尾
- 新增消费者,将该消费者放到队头(因为该消费者的负载必为0)
class QueueConsumerManager
{
public:using ptr = std::shared_ptr<QueueConsumerManager>;QueueConsumerManager(const std::string &vhost_name, const std::string &qname): _vhost_name(vhost_name), _qname(qname) {}// 1. 新增消费者[只有想要订阅当前队列消息的消费者才会调用该函数]Consumer::ptr createConsumer(const std::string &consumer_tag, const ConsumerCallback &callback, bool auto_ack){// 1. 加锁,并查找是否有该消费者std::unique_lock<std::mutex> ulock(_mutex);auto iter_map = _consumer_map.find(consumer_tag);if (iter_map != _consumer_map.end()){iter_type iter_list = iter_map->second; // iter_list是链表的迭代器return *iter_list;}// 2. 从队头插入该消费者Consumer::ptr cp = std::make_shared<Consumer>(consumer_tag, callback, _vhost_name, _qname, auto_ack);_consumer_list.push_front(cp);_consumer_map.insert(std::make_pair(consumer_tag, _consumer_list.begin()));return cp;}void removeConsumer(const std::string &consumer_tag){// 加锁并删除该消费者std::unique_lock<std::mutex> ulock(_mutex);auto iter_map = _consumer_map.find(consumer_tag);if (iter_map == _consumer_map.end())return;iter_type iter_list = iter_map->second; // iter_list是链表的迭代器// 删除_consumer_list.erase(iter_list);_consumer_map.erase(iter_map);}Consumer::ptr selectConsumer(){// 0. 加锁并判断是否为空std::unique_lock<std::mutex> ulock(_mutex);if (_consumer_list.empty()){default_warning("获取消费者失败,因为该队列没有消费者,虚拟机名称:%s ,队列名:",_vhost_name.c_str(),_qname.c_str());return Consumer::ptr();}// 1. 拿到队头消费者Consumer::ptr cp = _consumer_list.front();// 2. 将队头消费者移到队尾_consumer_list.splice(_consumer_list.end(), _consumer_list, _consumer_list.begin());// 因为splice是转移节点,不会导致迭代器失效,所以无需更新哈希表return cp;}bool exist(const std::string &consumer_tag){std::unique_lock<std::mutex> ulock(_mutex);return _consumer_map.count(consumer_tag) > 0;}bool empty(){std::unique_lock<std::mutex> ulock(_mutex);return _consumer_map.empty();}void clear(){std::unique_lock<std::mutex> ulock(_mutex);_consumer_map.clear();}// 支持通过消费者tag来获取消费者,这里用哈希表来提高查询效率// 这里的哈希表的value_type不能是vector的迭代器,因为vector的扩容和删除都有迭代器失效问题// 删除导致的迭代器失效问题还好解决,但是扩容导致的迭代器失效问题不好解决,因为vector的扩容被它通过封装屏蔽掉了,倒是也能检测【通过不断检测capcity()】,不过非常不优雅// 所以哈希表的value_type搞成Consumer::ptr的话,虽然可以快速查找,但是这样的话,哈希表的用途就不大的// 因为哈希表只能让查询操作变为O(1),但是删除还是O(N),因为vector还是要遍历的// 那能否就单纯只有一个vector呢?不能,消费者删除需求并不高,所以曾经我们用的vector,但是有了消息拉取功能之后,获取消费者的需求就很高了// 所以不能只有一个vector,必须要有一个哈希表// 而哈希表不能跟vector打好配合,所以我们将vector改为list,将RR轮转的负载均衡变为LRU式的负载均衡// 每次select时从队头取最近最少使用的消费者,访问之后放到队尾Consumer::ptr getConsumer(const std::string &consumer_tag){std::unique_lock<std::mutex> ulock(_mutex);auto iter_map = _consumer_map.find(consumer_tag);if (iter_map != _consumer_map.end()){iter_type iter_list = iter_map->second; // iter_list是链表的迭代器// 将iter_list移动到队尾_consumer_list.splice(_consumer_list.end(), _consumer_list, iter_list);return *iter_list;}return Consumer::ptr();}private:using iter_type = std::list<Consumer::ptr>::iterator;std::string _vhost_name;std::string _qname;std::mutex _mutex;std::list<Consumer::ptr> _consumer_list;std::unordered_map<std::string, iter_type> _consumer_map;
};
然后在总体消费者管理模块当中添加:
Consumer::ptr getConsumer(const std::string vhost_name, const std::string &qname, const std::string &consumer_tag)
{std::ostringstream oss;oss << "获取消费者失败,因为未能找到该队列消费者管理模块,qname = " << qname << "\n";QueueConsumerManager::ptr qcmp = getQueueConsumerManager(vhost_name, qname, oss);if (qcmp.get() == nullptr){return Consumer::ptr();}return qcmp->getConsumer(consumer_tag);
}
3.信道模块实现
- 拿到消费者和消息
- 封装异步任务,抛入线程池
异步任务:- 调用该消费者的消费处理回调函数
- auto_ack的问题
- 返回基础响应
不要忘了:服务器所描述的消费者的消费处理回调函数的功能仅仅是:
向消费者发送基础消费处理响应BasicConsumeResponse
void basicPull(const BasicPullRequestPtr &req)
{// 1. 拿到该消费者Consumer::ptr cp = _consumer_manager_ptr->getConsumer(req->vhost_name(), req->queue_name(), req->consumer_tag());if(cp.get()==nullptr){default_error("拉取消息失败,因为消费者不存在,消费者tag:%s",req->consumer_tag().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 2. 拿到消息MessagePtr mp = _vhost_manager_ptr->basicConsume(req->vhost_name(), req->queue_name());if(mp.get()==nullptr){default_error("拉取消息失败,因为该队列没有待推送消息,队列名:%s",req->queue_name().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 3. 封装异步任务,抛入线程池auto func = [cp, mp, req, this](){// 3. 调用该消费者的消费处理回调函数cp->_callback(cp->_consumer_tag, mp->mutable_valid()->mutable_properities(), mp->valid().body());// 4. auto_ack的问题if (cp->_auto_ack){this->_vhost_manager_ptr->basicAck(req->vhost_name(), req->queue_name(), mp->valid().properities().msg_id());}};_pool_ptr->put(func);// 4. 基础相应basicResponse(req->req_id(),req->channel_id(),true);
}
4.broker服务器注册响应业务函数
_dispatcher.registerMessageCallback<BasicPullRequest>(std::bind(&Server::OnBasicPull,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
// 8. 消息拉取
void OnBasicPull(const muduo::net::TcpConnectionPtr &conn, const BasicPullRequestPtr &req, muduo::Timestamp)
{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("确认消息时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("确认消息失败,因为获取信道失败");return;}mychannel->basicPull(req);
}
四、客户端修改
客户端只需要修改Channel即可,
// 消息拉取
bool BasicPull()
{if (_consumer.get() == nullptr){default_error("消息拉取失败,该信道没有关联消费者");return false;}BasicPullRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(_consumer->_vhost_name);req.set_consumer_tag(_consumer->_consumer_tag);req.set_queue_name(_consumer->_queue_name);// 发送请求_codec->send(_conn, req);std::ostringstream oss;BasicCommonResponsePtr resp = waitResponse(rid);if (resp->ok()){default_info("消息拉取成功: %s",_consumer->_consumer_tag.c_str());}else{default_info("消息拉取失败: %s",_consumer->_consumer_tag.c_str());}return resp->ok();
}
五、修改消息推送逻辑–设计
1.考量
我们之前的消息推送逻辑是:当生产者发布消息之后,我们会立刻找消费者去推送该消息,如果没有消费者,那么就会丢弃该消息
这个逻辑在之前只有消息推送功能时,是正确的,因为不丢弃消息就是浪费资源
而现在我们实现了消息拉取功能,此时这种情况就可以不丢弃消息,而将其存放到队列当中,等待消费者进行拉取
但是推送的消息要不要跟拉取的消息分割一下呢,
让生产者发布消息时选择是推送的,还是拉取的,还是都有
因为一些消息具有实时性
,更希望快速被处理,就可以放到待推送
当中。而那些对实时性没这么高要求
的,就可以放到待确认
2.要不要给BOTH消息推送机制呢?
BOTH的意思是:这个消息既放到待推送链表当中,又放到待拉取链表当中
因为我们的消息推送是放到异步线程池当中去跑的,所以存在拉取快于推送的情况
哪个快听谁的,如果服务器先推送,那么这个消息就按推送走
如果服务器先拉取,那么这个消息就按拉取走
因为我们拉取之后,会将该消息从推送当中删除
推送之后,会将该消息从拉取当中删除,而且操作都因为加了互斥锁而成为了原子操作
所以不会存在同一消息被消费2次的情形
因为消息被消费只有3种情况:
- 推送(推送时会将消息从拉取当中删除,所以不怕该消息同时被拉取)
- 推送进行之前被拉取(此时推送就找不到消息了,没事)
- 推送失败之后被拉取(推送时会从拉取当中删除,推送失败之后会将消息放到拉取当中,所以没事)
因此BOTH是可以的,所以我们给上BOTH
3.设计
如何修改呢?
消息推送如果失败,也是将其放到待拉取消息链表当中,等待消费者主动拉取
持久化消息恢复之后,将其放到待拉取消息链表当中,等待消费者主动拉取
现在有个问题:
既然持久化的消息在恢复之后是直接放到待拉取消息链表当中的,那么有必要将消息的推送机制一并持久化吗?
没必要
六、实现
1.修改消息proto文件
// 3. 消息的推送机制:推送/拉取/推+拉
enum PublishMechanism
{UNKNOWNMECHANISM = 0;PUSH = 1;PULL = 2;BOTH = 3;
}// 4. 消息的基本属性
message BasicProperities
{string msg_id = 1;DeliveryMode mode = 2;string routing_key = 3;
}message Message
{message ValidLoad{string body = 1;BasicProperities properities = 2;string valid = 3;// 因为bool的true/false在protobuf当中持久化后的长度不同,因此我们不用bool,而是用"0"代表无效,"1"代表有效}ValidLoad valid = 1;uint64 offset = 2;uint64 len = 3; PublishMechanism mechanism = 4;
}
protoc --cpp_out=. mq_msg.proto
2.修改队列消息管理模块
1.成员的修改
内部类
struct iter_node
{using iter_type = std::list<MessagePtr>::iterator;iter_type push_iter;iter_type pull_iter;
};std::list<MessagePtr> _waitpush_list;
std::list<MessagePtr> _waitpull_list;
std::unordered_map<MessagePtr,iter_node> _waitpublish_map;
2.recovery恢复历史消息的修改
把别忘了gc恢复的待拉取消息链表存入_waitpublish_map当中!!
void recovery()
{std::unique_lock<std::mutex> ulock(_mutex);// 1. 恢复历史消息,将消息放入待拉取消息链表当中_waitpull_list = _mapper.gc();// 2. 遍历待拉取消息链表,将其中的消息都放入_waitpublish_map当中for (auto iter_list = _waitpull_list.begin(); iter_list != _waitpull_list.end(); ++iter_list){MessagePtr mp = *iter_list;_waitpublish_map[mp].pull_iter = iter_list;_waitpublish_map[mp].push_iter = _waitpush_list.end();}// 3. 将gc后的消息放到持久化哈希表中for (auto &mp : _waitpull_list){_durable_map[mp->valid().properities().msg_id()] = mp;}// 2. 更新持久化消息总数和有效消息总数_total_count = _valid_count = _durable_map.size();
}
3.发布消息的修改
bool publishMessage(const BasicProperities *bp, const std::string &body, DeliveryMode mode, PublishMechanism mechanism)
{// 消息能否持久化取决于队列是否持久化,因此消息的持久化与否不仅要看bp当中的mode,还要看DeliveryMode mode// 只有当DeliveryMode mode是持久化时,才看bp当中的mode,否则一律不持久化// 1. 构建消息智能指针MessagePtr mp = std::make_shared<Message>();mp->mutable_valid()->set_body(body);mp->mutable_valid()->set_valid("1");mp->mutable_valid()->mutable_properities()->set_msg_id(bp->msg_id());mp->mutable_valid()->mutable_properities()->set_routing_key(bp->routing_key());DeliveryMode final_mode = (mode == DURABLE && bp->mode() == DURABLE) ? DURABLE : UNDURABLE;mp->mutable_valid()->mutable_properities()->set_mode(final_mode);mp->set_mechanism(mechanism);// 加锁std::unique_lock<std::mutex> ulock(_mutex);// 2. 看是否需要持久化if (final_mode == DURABLE){if (!_mapper.insert(mp)){default_error("发布消息失败, 因为消息持久化失败, 消息ID: %s",bp->msg_id().c_str());return false;}// 放到持久化哈希表中_durable_map[bp->msg_id()] = mp;_total_count++;_valid_count++;}// 3. 根据消息发布机制来进行消息存储// 1. 放到待推送链表if (mechanism == PUSH){_waitpush_list.push_back(mp);_waitpublish_map[mp].push_iter = std::prev(_waitpush_list.end());_waitpublish_map[mp].pull_iter = _waitpull_list.end();}else if (mechanism == PULL){_waitpull_list.push_back(mp);_waitpublish_map[mp].push_iter = _waitpush_list.end();_waitpublish_map[mp].pull_iter = std::prev(_waitpull_list.end());}else if (mechanism == BOTH){_waitpush_list.push_back(mp);_waitpull_list.push_back(mp);_waitpublish_map[mp].push_iter = std::prev(_waitpush_list.end());_waitpublish_map[mp].pull_iter = std::prev(_waitpull_list.end());}else{default_error("发布消息失败,因为消息的发布机制未知, 消息ID: %s",bp->msg_id().c_str());return false;}return true;
}
3.front的修改
抽离出一个共用函数:
MessagePtr front(std::list<MessagePtr> &main_list, std::list<MessagePtr> &sub_list, bool ispush)
{std::unique_lock<std::mutex> ulock(_mutex);// 0.加锁并判空if (main_list.empty()){return MessagePtr();}// 1.从链表取消息,设置待确认状态MessagePtr mp = main_list.front();main_list.pop_front();_waitack_map[mp->valid().properities().msg_id()] = mp;// 2.在另一个链表当中进行删除auto iter_hash = _waitpublish_map.find(mp);// 假设它是push,那么另一个链表就是pulliter_node::iter_type iter_list = iter_hash->second.pull_iter;if (!ispush){iter_list = iter_hash->second.push_iter;}if (iter_list != sub_list.end()){sub_list.erase(iter_list);}// 3.在哈希表当中删除_waitpublish_map.erase(iter_hash);return mp;
}
从待推送链表取消息:
MessagePtr push_list_front()
{return front(_waitpush_list,_waitpull_list,true);
}
从待拉取链表取消息:
MessagePtr pull_list_front()
{return front(_waitpull_list,_waitpush_list,false);
}
4.clear的修改
// 需要提供销毁该队列所有信息的方法(删除队列时要用)
void clear()
{std::unique_lock<std::mutex> ulock(_mutex);_mapper.removeFile();_waitpush_list.clear();_waitpull_list.clear();_waitpublish_map.clear();_waitack_map.clear();_durable_map.clear();_valid_count = _total_count = 0;
}
3.总体消息管理模块修改
1.PublishMessage加一个参数
bool publishMessage(const std::string &qname, const BasicProperities *bp, const std::string &body, DeliveryMode mode, PublishMechanism mechanism)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("发布消息失败,因为该队列的消息管理模块句柄尚未初始化");return false;}qmmp = iter->second;}return qmmp->publishMessage(bp, body, mode, mechanism);
}
2.获取链表队头消息修改
MessagePtr push_list_front(const std::string &qname)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("获取待推送消息失败,因为该队列的消息管理模块句柄尚未初始化");return MessagePtr();}qmmp = iter->second;}return qmmp->push_list_front();
}MessagePtr pull_list_front(const std::string &qname)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("获取待拉取消息失败,因为该队列的消息管理模块句柄尚未初始化");return MessagePtr();}qmmp = iter->second;}return qmmp->pull_list_front();
}
至此,消息模块就搞完了,下面顺着这个层状结构往上找,去修改虚拟机
4.虚拟机模块的修改
1.发布消息增加一个参数
1.虚拟机模块
bool basicPublish(const std::string &qname, const BasicProperities *bp, const std::string &body,PublishMechanism mechanism)
{// 在这里能够知道队列的持久化方式,因此就能够传递durable了// 1. 查找该队列的ptr,看是否存在,拿到durable// 2. 发布消息MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("发布消息失败,因为该队列不存在, 队列名: %s",qname.c_str());return false;}return _mmp->publishMessage(qname, bp, body, (mqp->durable) ? DURABLE : UNDURABLE,mechanism);
}
2.虚拟机管理模块
bool basicPublish(const std::string &vname, const std::string &qname, const BasicProperities *bp, const std::string &body, PublishMechanism mechanism)
{std::ostringstream oss;oss << "发布消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->basicPublish(qname, bp, body, mechanism);
}
2.推送和拉取消息的修改
1.虚拟机模块
// 推送[消费]消息
MessagePtr basicConsume(const std::string &qname)
{return _mmp->push_list_front(qname);
}// 拉取消息
MessagePtr basicPull(const std::string &qname)
{return _mmp->pull_list_front(qname);
}
2.虚拟机管理模块
// 推送[消费]消息
MessagePtr basicConsume(const std::string &vname, const std::string &qname)
{std::ostringstream oss;oss << "推送消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MessagePtr();}return vhp->basicConsume(qname);
}// 拉取消息
MessagePtr basicPull(const std::string &vname, const std::string &qname)
{std::ostringstream oss;oss << "拉取消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MessagePtr();}return vhp->basicPull(qname);
}
然后虚拟机模块就搞定了,再往上走,修改信道
5.proto网络通信协议修改
因为信道是网络服务模块,且生产者要提供的参数多了一个,所以我们需要修改一下proto网络通信协议
1.BasicPublishRequest
其实就是给BasicPublishRequest加一个PublishMechanism 字段而已
//7. 消息的发布与确认
message BasicPublishRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string exchange_name = 4; //需要用户指定:消息发布到哪个交换机上,然后我们给他进行路由匹配,放到对应队列当中string body = 5;BasicProperities properities = 6;PublishMechanism mechanism = 7;// 消息的发布方式
}
protoc --cpp_out=. mq_proto.proto
6.信道模块修改
1.发布消息的修改
- 给basicPublish多传一个参数req->mechanism()
- 只有当该消息是PUSH的发布机制时,才需要将publishCallback封装为异步任务抛入线程池
void basicPublish(const BasicPublishRequestPtr &req)
{// 1. 先找到该交换机的交换机类型Exchange::ptr ep = _vhost_manager_ptr->getExchange(req->vhost_name(), req->exchange_name());if (ep.get() == nullptr){default_error("发布消息失败,因为交换机不存在\n,交换机名称:%s",req->exchange_name().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 2. 先找到消息发布的交换机 绑定的所有队列MsgQueueBindingMap qmap = _vhost_manager_ptr->getAllBindingsByExchange(req->vhost_name(), req->exchange_name());// 3. 遍历所有队列,进行路由匹配与消息投递for (auto &kv : qmap){Binding::ptr bp = kv.second;BasicProperities *properities = nullptr;::std::string routing_key;if (req->has_properities()){properities = req->mutable_properities();routing_key = properities->routing_key();}if (Router::route(routing_key, bp->binding_key, ep->type)){// 把消息投递到指定队列_vhost_manager_ptr->basicPublish(req->vhost_name(), bp->queue_name, properities, req->body(), req->mechanism());// 判断该消息是否需要推送if (req->mechanism() == PUSH || req->mechanism() == BOTH){// 5. 向线程池添加一个消息消费任务,消费任务交给线程池中的线程去做,解放Channel线程去做更重要的任务auto func = ::std::bind(&Channel::publishCallback, this, req->vhost_name(), bp->queue_name);_pool_ptr->put(func);}}}// 返回响应即可basicResponse(req->req_id(), req->channel_id(), true);
}
2.publishCallback的修改
注意注意:
- 当publishCallback推送消息找不到消费者时,要将该消息放到待拉取消息链表当中
- 放过去的时候,将消息推送机制改为PULL
1.坑点–连锁BUG
这里有一个连锁BUG问题:
我们复用basicPublish的时候,会复用到消息管理模块当中的新增消息
如果我们的消息是持久化的,那么就会重复持久化,持久化时又会修改消息结构体当中的offset字段,因此ACK的时候就只能删除持久化消息副本,而无法删除原件
此时就BUG了:那么我们想当然地就会这么想:
那我把DeliveryMode改成UNDURABLE不就行了?
mp->mutable_valid()->mutable_properities()->set_mode(UNDURABLE);
不行的,因为我们ACK的时候,是否需要删除持久化消息是看该消息的DeliveryMode
因此这样的话,ACK时就无法删除该消息了,也是BUG
怎么办呢?
2.解决方案
-
从需求解决问题:
我们想要的其实就是把一个MessagePtr放到待拉取消息链表当中,因此让消息管理模块提供这么一个接口不就行了吗 -
从复用方面解决问题:
我们依然选择持久化,但是在调用basicPublish之前先调用一下basicAck,将原件先ACK了 -
从ACK方面方面解决问题:
修改ACK:是否删除持久化消息不依据DeliveryMode,而是依赖于该消息是否在持久化哈希表当中
下面我们进行选择:
- 效率:第2种由于需要复用ACK,也就需要进行IO操作,效率低,所以淘汰第2种
- 业务/代码优雅角度:第3种必须依赖于持久化哈希表,而不能依赖于DeliveryMode,不好,而且修改DeliveryMode不利于排查BUG时的调试,不好
因此,我们选择第1种
3.实现接口
QueueMessageManager:
// 提供向待拉取消息链表当中插入数据
void insert_pull(const MessagePtr& mp)
{std::unique_lock<std::mutex> ulock(_mutex);_waitpull_list.push_back(mp);_waitpublish_map[mp].pull_iter=std::prev(_waitpull_list.end());_waitpublish_map[mp].push_iter=_waitpush_list.end();
}
MessageManager:
// 提供向待拉取消息链表当中插入数据
bool insert_pull(const std::string &qname, const MessagePtr &mp)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("发布消息失败,因为该队列的消息管理模块句柄尚未初始化");return false;}qmmp = iter->second;}qmmp->insert_pull(mp);return true;
}
VirtualHost:
// 将消息放入待拉取消息链表当中
bool insert_pull(const std::string& qname,const MessagePtr& mp)
{return _mmp->insert_pull(qname,mp);
}
VirtualHostManager:
// 将消息放入待拉取消息链表当中
bool insert_pull(const std::string &vname, const std::string &qname, const MessagePtr &mp)
{std::ostringstream oss;oss << "将消息放入待拉取消息链表失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->insert_pull(qname, mp);
}
4.publishCallback的修改
// 推送消息(取出消息,取出消费者,调用对应消费者的消费处理回调函数)
// 不能先取消费者,因为那样会导致 在无消费者的情况下,待推送消息在链表当中堆积的情况
// 而通过先取消息,再取消费者,将消息放到待拉取消息链表当中,等待有消费者拉取
void publishCallback(const ::std::string &vname, const ::std::string &qname)
{// 1.取出消息MessagePtr mp = _vhost_manager_ptr->basicConsume(vname, qname);if (mp.get() == nullptr){default_info("消息的消费失败, 因为消息队列为空,没有消息: %s",qname.c_str());return;}// 2.取出消费者Consumer::ptr cp = _consumer_manager_ptr->selectConsumer(vname, qname);if (cp.get() == nullptr){default_info("该队列中暂无消费者,将该消息放入该队列的待拉取消息链表当中 %s",qname.c_str());if (mp->mechanism() == PUSH || mp->mechanism() == BOTH){// 这里要将该消息重新添加到待拉取消息链表当中_vhost_manager_ptr->insert_pull(vname, qname, mp);}return;}// 3.调用消费者的消费处理回调函数cp->_callback(cp->_consumer_tag, mp->mutable_valid()->mutable_properities(), mp->valid().body());default_info("调用消费者的消费处理回调函数成功 %s",qname.c_str());// 4.如果消费者有自动确认标志,则进行自动确认if (cp->_auto_ack == true){_vhost_manager_ptr->basicAck(vname, qname, mp->valid().properities().msg_id());}
}
3.拉取消息的修改
void basicPull(const BasicPullRequestPtr &req)
{// 1. 拿到该消费者Consumer::ptr cp = _consumer_manager_ptr->getConsumer(req->vhost_name(), req->queue_name(), req->consumer_tag());if(cp.get()==nullptr){default_error("拉取消息失败,因为消费者不存在,消费者tag:%s",req->consumer_tag().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 2. 拿到消息MessagePtr mp = _vhost_manager_ptr->basicPull(req->vhost_name(), req->queue_name());if(mp.get()==nullptr){default_error("拉取消息失败,因为该队列没有待推送消息,队列名:%s",req->queue_name().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 3. 封装异步任务,抛入线程池auto func = [cp, mp, req, this](){// 3. 调用该消费者的消费处理回调函数cp->_callback(cp->_consumer_tag, mp->mutable_valid()->mutable_properities(), mp->valid().body());// 4. auto_ack的问题if (cp->_auto_ack){this->_vhost_manager_ptr->basicAck(req->vhost_name(), req->queue_name(), mp->valid().properities().msg_id());}};_pool_ptr->put(func);// 4. 基础相应basicResponse(req->req_id(),req->channel_id(),true);
}
7.客户端修改
就是给Channel多加一个参数而已
bool BasicPublish(const std::string &vhost_name, const std::string &exchange_name, const BasicProperities *bp, const std::string &body,PublishMechanism mechanism)
{BasicPublishRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_body(body);req.set_mechanism(mechanism);if (bp != nullptr){req.mutable_properities()->set_msg_id(bp->msg_id());req.mutable_properities()->set_mode(bp->mode());req.mutable_properities()->set_routing_key(bp->routing_key());}// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);if (resp->ok()){default_info("发布消息成功 %s",body.c_str());}else{default_info("发布消息失败 %s",body.c_str());}return resp->ok();
}
七、验证
1.消息拉取功能与恢复功能联合测试
我们的验证方式是:
- 先让生产者跑,然后再让消费者跑,消费者能够拉取消息,则成功
- 让生产者跑,制造持久化未确认消息,然后服务器重启(恢复历史消息),然后消费者跑,消费者能够拉取消息,则成功
1.测试1
1.生产者
消息的发布机制就给PUSH了
#include "connection.hpp"
using namespace ns_mq;
#include <thread>
#include <vector>
using namespace std;// host1
void publisher1(const Connection::ptr &conn, const std::string &thread_name)
{// 1. 创建信道Channel::ptr cp = conn->getChannel();// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("host1", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host1", "queue1", true, false, false, {});cp->declareMsgQueue("host1", "queue2", true, false, false, {});cp->bind("host1", "exchange1", "queue1", "news.sport.#");cp->bind("host1", "exchange1", "queue2", "news.*.zhangsan");// 3. 发送10条消息BasicProperities bp;bp.set_mode(DURABLE);bp.set_routing_key("news.sport.basketball");for (int i = 0; i < 10; i++){bp.set_msg_id(UUIDHelper::uuid());cp->BasicPublish("host1", "exchange1", &bp, "Hello -" + std::to_string(i), PUSH);}// 4. 关闭信道conn->returnChannel(cp);
}// host2
void publisher2(const Connection::ptr &conn, const std::string &thread_name)
{// 1. 创建信道Channel::ptr cp = conn->getChannel();// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host2", "./host2/resource.db", "./host2/message");cp->declareExchange("host2", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host2", "queue1", true, false, false, {});cp->declareMsgQueue("host2", "queue2", true, false, false, {});cp->bind("host2", "exchange1", "queue1", "news.sport.#");cp->bind("host2", "exchange1", "queue2", "news.*.zhangsan");// 3. 发送10条消息BasicProperities bp;bp.set_mode(DURABLE);bp.set_routing_key("news.sport.basketball");for (int i = 0; i < 10; i++){bp.set_msg_id(UUIDHelper::uuid());cp->BasicPublish("host2", "exchange1", &bp, "Hello -" + std::to_string(i), PUSH);}// 4. 关闭信道conn->returnChannel(cp);
}int main()
{AsyncWorker::ptr worker = std::make_shared<AsyncWorker>();Connection::ptr myconn = std::make_shared<Connection>("127.0.0.1", 8888, worker);vector<thread> thread_v;thread_v.push_back(thread(publisher1, myconn, "thread1"));thread_v.push_back(thread(publisher2, myconn, "thread2"));for (auto &t : thread_v)t.join();return 0;
}
2.消费者
订阅完队列之后,每隔1s拉取一次消息
#include "connection.hpp"
using namespace ns_mq;
#include <thread>
#include <vector>
#include <thread>
using namespace std;// 因为要拿到信道才能进行确认,所以这里需要把Channel::ptr bind过来
void Callback(const Channel::ptr &cp, const std::string &consumer_tag, const BasicProperities *bp, const std::string &body)
{// 1. 消费消息std::string id;if (bp != nullptr){id = bp->msg_id();}std::cout << consumer_tag << " 消费了消息: " << body << ", 消息ID: " << id << "\n";// 2. 确认消息if (bp != nullptr)std::cout << cp->BasicAck(id) << "\n";
}void consumer1(const Connection::ptr &conn, const std::string &thread_name)
{Channel::ptr cp = conn->getChannel();default_debug("consumer1: 信道ID:",cp->cid().c_str());// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("host1", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host1", "queue1", true, false, false, {});cp->declareMsgQueue("host1", "queue2", true, false, false, {});cp->bind("host1", "exchange1", "queue1", "news.sport.#");cp->bind("host1", "exchange1", "queue2", "news.*.zhangsan");// 3. 创建消费者cp->BasicConsume("host1", "consumer1", "queue1",std::bind(Callback, cp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), false);// 4. 等待消息while (true){cp->BasicPull();std::this_thread::sleep_for(std::chrono::seconds(1));}// 5. 关闭信道conn->returnChannel(cp);
}void consumer2(const Connection::ptr &conn, const std::string &thread_name)
{Channel::ptr cp = conn->getChannel();default_debug("consumer2: 信道ID:",cp->cid().c_str());// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host2", "./host2/resource.db", "./host2/message");cp->declareExchange("host2", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host2", "queue1", true, false, false, {});cp->declareMsgQueue("host2", "queue2", true, false, false, {});cp->bind("host2", "exchange1", "queue1", "news.sport.#");cp->bind("host2", "exchange1", "queue2", "news.*.zhangsan");// 3. 创建消费者cp->BasicConsume("host2", "consumer2", "queue1",std::bind(Callback, cp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), false);// 4. 等待消息while (true){cp->BasicPull();std::this_thread::sleep_for(std::chrono::seconds(1));}// 5. 关闭信道conn->returnChannel(cp);
}int main()
{AsyncWorker::ptr worker = std::make_shared<AsyncWorker>();// 1. 创建连接和信道Connection::ptr conn = std::make_shared<Connection>("127.0.0.1", 8888, worker);vector<thread> thread_v;thread_v.push_back(thread(consumer1, conn, "thread1"));thread_v.push_back(thread(consumer2, conn, "thread2"));for (auto &t : thread_v)t.join();return 0;
}
3.演示
先让生产者跑,然后再让消费者跑,消费者能够拉取消息,则成功
2.测试2 – 演示
让生产者跑,制造持久化未确认消息,然后服务器重启(恢复历史消息),然后消费者跑,消费者能够拉取消息,则成功
2.PULL测试
我们的验证方式是:客户端纯拉取,所有消息必须是由拉取进行消费的
因为客户端拉取消息是每1s拉取一次,所以拉取消息会持续10s,如果是推送的话,那么一瞬间就会搞定
先让消费者跑,再让生产者跑
只需要把生产者发布消息时的发布机制改一下即可
cp->BasicPublish("host2", "exchange1", &bp, "Hello -" + std::to_string(i), PULL);
演示:
2.BOTH测试
我们在publishCallback当中故意让工作线程等上5s,这样就能让拉取快于推送了
因此:
void publishCallback(const ::std::string &vname, const ::std::string &qname)
{std::this_thread::sleep_for(std::chrono::seconds(5));
//模拟5s后异步线程才开始执行该函数,测试BOTH时使用,用来让拉取快于推送
为了保证生产者主线程退出之前异步工作线程能够执行完这些publishCallback
因此我们让生产者结束之后陷入死循环
// 4. 测试BOTH时:等待异步线程执行完publishCallback
while (true)
{std::this_thread::sleep_for(std::chrono::seconds(1000));
}// 5. 关闭信道
conn->returnChannel(cp);
演示:
验证成功
本篇博客分了两大点来进行扩展,是为了让大家更有一步步的代入感,不至于一上来就这么突兀
所以代码篇幅较大,希望大家理解
动图比较卡顿是因为帧数太少,因为CSDN不支持上传5MB以上的图片,所以只能减帧压缩体积,抱歉
以上就是项目扩展二:消息拉取功能的实现的全部内容
相关文章:

项目扩展二:消息拉取功能的实现
项目扩展二:消息拉取功能的实现 一、回顾一下消息推送功能是如何实现的二、设计消息拉取功能1.服务器如何处理2.定义Request和Response1.定义Request2.proto文件 三、服务器实现消息拉取1.业务模块的实现:信道模块2.消费者管理模块实现O(1)获取消费者1.目…...

C语言6大常用标准库 -- 4.<math.h>
目录 引言 4. C标准库--math.h 4.1 简介 4.2 库变量 4.3 库宏 4.4 库函数 4.5 常用的数学常量 🌈你好呀!我是 程序猿 🌌 2024感谢你的陪伴与支持 ~ 🚀 欢迎一起踏上探险之旅,挖掘无限可能,共同成长&…...

【图像匹配】基于SIFT算法的图像匹配,matlab实现
博主简介:matlab图像代码项目合作(扣扣:3249726188) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 本次案例是基于基于SIFT算法的图像匹配,用matlab实现。 一、案例背景和算法介绍 本…...

C++门迷宫
目录 开头程序程序的流程图程序游玩的效果下一篇博客要说的东西 开头 大家好,我叫这是我58。 程序 #include <iostream> using namespace std; void printmaze(const char strmaze[11][11]) {int i 0;int ia 0;for (; i < 11; i) {for (ia 0; ia <…...
用最通俗易懂的语言和例子讲解三维点云
前言: 我整体的学习顺序是看的按B站那“唯一”的三维点云的视频学习的(翻了好久几乎没有第二个...)对于深度学习部分,由于本人并没有进行学习,所以没有深究。大多数内容都进行了自己的理解并找了很多网络的资源方便理解…...

VM虚拟机下载以及激活
传统的官网已经找不到下载了,这里我将下载好的放在阿里云盘,百度云盘太慢了,懂得都得 阿里云盘分享 下载好了后会是一个exe文件,直接双击运行就可 下载无脑下一步即可,这里不做介绍 下载好了后,需要密钥这里…...

详解Ajax与axios的区别
Ajax与Axios在Web开发中都是用于发送HTTP请求的技术,但它们在多个方面存在显著的差异。以下是对两者区别的详细解析: 1. 技术原理 Ajax:Asynchronous JavaScript and XML(异步JavaScript和XML)的缩写,是一…...

golang学习笔记28——golang中实现多态与面向对象
推荐学习文档 golang应用级os框架,欢迎stargolang应用级os框架使用案例,欢迎star案例:基于golang开发的一款超有个性的旅游计划app经历golang实战大纲golang优秀开发常用开源库汇总想学习更多golang知识,这里有免费的golang学习笔…...

运行 xxxxApplication 时出错。命令行过长。 通过 JAR 清单或通过类路径文件缩短命令行,然后重新运行。
一、问题描述 运行 xxxxApplication 时出错。命令行过长。 通过 JAR 清单或通过类路径文件缩短命令行,然后重新运行。 二、问题分析 在idea中,运行一个springboot项目,在使用大量的库和依赖的时候,会出现报错“命令行过长”&…...

k8s自动清理pod脚本分享
检查会遇到集群节点内存消耗超过90%,我们可以筛选一些可以进行重启的pods,如脚本中涉及svc-开头的,进行触发即重启的shell编写。此项会涉及metrics组件需要安装。 #!/bin/bash# 设置内存使用率阈值为90% MEMORY_THRESHOLD90# 初始化一个数组…...

Go并发编程的高级技巧——请求复制与限流
解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 在一些高性能应用场景中,快速响应是非常重要的目标。例如,当一个应用需要快速响应用户的HTTP请求,或从多个副本中检索数据时,如何优化请求处理成为关键。本文将讨论如何在Go语言中,通过并发和限流机制来实现…...

网站建设模板选择哪种
在选择网站建设模板时,需要考虑多个因素,包括网站的目的、受众、内容类型以及个性化需求等。以下是一些常见的网站建设模板类型,以及它们的特点,希望对你的选择有所帮助。 企业/商务模板: 企业和商务网站通常需要专业、…...

【linux】kill命令
kill 命令在 Linux 和类 Unix 系统中用于向进程发送信号,默认情况下是发送 SIGTERM(信号 15),请求程序终止运行。如果程序没有响应 SIGTERM 信号,可以使用 SIGKILL(信号 9)强制终止进程…...

Python基础 | 在虚拟环境中安装并在指定文件夹中打开Jupyter notebook
在虚拟环境中安装并在指定文件夹中打开Jupyter notebook 前言一、在虚拟环境下安装Jupyter notebook二、在指定路径下打开Jupyter notebook 前言 Jupyter Notebook 是一个基于 Web 的交互式计算环境,主要功能是将代码、文本、数学方程式、可视化和其他相关元素组合…...

1.Spring-容器-注册
一、Bean和获取Bean (1)创建IoC容器: SpringApplication.run(类名.class, args); ConfigurableApplicationContext ioc SpringApplication.run(Spring01IocApplication.class, args); (2)将对象注册到IoC容器中&am…...

Mapper.xml SQL大于小于号转义符
Mapper.xml中写的SQL语句,大于小于号字符直接写会报错,需要变成转义字符 对应如下: Mapper.xml SQL大于小于号转义符...

Linux:进程(三)——进程状态
目录 Linux源代码对进程的描述 R S D T t X Z(进程僵尸) 孤儿进程 Linux源代码对进程的描述 理论上把进程状态大致被分为了:运行、阻塞、挂起。那么,在操作系统中具体是如何描述状态的。(有时候Linux内核也把…...

Effective Java 学习笔记 如何为方法编写文档
目录 方法的文档注解设计的原则 Javadoc常用的文档注释 一些注意细节 通过Javadoc命令生成h5页面 这是第8章Java方法的最后一部分,聚焦为导出的API编写文档注释。 如果要想使得API真正可用,配套的文档是必须的。Java提供了Javadoc这个文档生成工具&…...

TCP四大拥塞控制算法总结
四大算法:1.慢启动,2.拥塞避免,3.拥塞发生,4.快速恢复。 慢启动: 首先连接建好的开始先初始化拥塞窗口cwnd大小为1,表明可以传一个MSS大小的数据。 每当收到一个ACK,cwnd大小加一,…...

深入解析ElasticSearch从基础概念到性能优化指南
一.引言 ElasticSearch是一个分布式的搜索和分析引擎,专为处理大规模的结构化和非结构化数据而设计。它建立在Apache Lucene之上,提供了强大的全文搜索能力、高可用性和实时分析的功能。无论是作为日志分析平台,还是作为数据驱动的应用程序的…...

git分支合并时忽略指定文件
分支合并忽略特定文件步骤 1.在项目根目录下cmd窗口运行以下命令 git config merge.ours.driver true2.在项目根目录下新建文件.gitattributes然后文件中写入需要忽略的文件名 mergeours, 一个文件占一行 Dockerfile mergeours /nginx/default.conf mergeours...

基于微信小程序的童装商城的设计与实现+ssm(lw+演示+源码+运行)
童装商城小程序 摘 要 随着移动应用技术的发展,越来越多的用户借助于移动手机、电脑完成生活中的事务,许多的传统行业也更加重视与互联网的结合,由于城镇人口的增加,人们去商场购物总是排着长长的队伍,对于时间紧的人…...

什么叫后验分布
后验分布(Posterior Distribution)是在贝叶斯统计中一个重要的概念。它指的是在观测到数据之后,对参数或潜变量的分布的更新。具体来说,后验分布是基于先验分布(Prior Distribution)和似然函数(…...

Godot游戏如何提升触感体验
在游戏世界中,触感体验至关重要,既能极大提升玩家沉浸感,让其深度融入游戏,在操作角色或与环境互动时,通过触感反馈获得身临其境的真实感(比如动作游戏中角色攻击或受击时的振动反馈,能使玩家更…...

数字图像面积计算一般方法及MATLAB实现
一、引言 在数字图像处理中,经常需要获取感兴趣区域的面积属性,下面给出图像处理的一般步骤。 1.读入的彩色图像 2.将彩色图像转化为灰度图像 3.灰度图像转化为二值图像 4.区域标记 5.对每个区域的面积进行计算和显示 二、程序代码 %面积计算 cle…...

【STL】 set 与 multiset:基础、操作与应用
在 C 标准库中,set 和 multiset 是两个非常常见的关联容器,主要用于存储和管理具有一定规则的数据集合。本文将详细讲解如何使用这两个容器,并结合实例代码,分析其操作和特性。 0.基础操作概览 0.1.构造: set<T&…...

xhs 小红书 x-s web 分析
声明: 本文章中所有内容仅供学习交流使用,不用于其他任何目的,抓包内容、敏感网址、数据接口等均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关! 有相关问题请第一时间头像私信联系我…...

胤娲科技:谷歌DeepMind祭出蛋白质设计新AI——癌症治疗迎来曙光
在科技的浩瀚星空中,DeepMind的“阿尔法”家族总是能带来令人瞩目的璀璨光芒。这一次,它们再次以惊人的姿态, 将AI的触角深入到了生命的微观世界——蛋白质设计领域,为我们描绘了一幅未来医疗的宏伟蓝图。 想象一下,一…...

【后端】【nginx】nginx常用命令
文章目录 1. 启动与停止相关命令2. 配置文件检查与验证3. 查看日志4. 查看状态与版本5. 端口与连接相关命令 1. 启动与停止相关命令 # 启动 NGINX sudo nginx# 立即停止 NGINX sudo nginx -s stop# 优雅停止 NGINX sudo nginx -s quit# 优雅重载配置 sudo nginx -s reload# 完…...

MATLAB系列08:输入/输入函数
MATLAB系列08:输入/输入函数 8. 输入/输入函数8.1 函数textread8.2 关于load和save命令的进一步说明8.3 MATLAB文件过程简介8.4 文件的打开和关闭8.4.1 fopen函数8.4.2 fclose函数 8.5 二进制 I/O 函数8.5.1 fwrite 函数8.5.2 fread函数 8.6 格式化 I/O 函数8.6.1 f…...