高性能分布式消息队列系统(四)
八、客户端模块的实现
客户端实现的总体框架
在 RabbitMQ 中,应用层提供消息服务的核心实体是 信道(Channel)。
用户想要与消息队列服务器交互时,通常不会直接操作底层的 TCP 连接,而是通过信道来进行各种消息的发布、订阅、确认等操作。
信道可以看作是在单个 TCP 连接之上的轻量级虚拟连接,它负责封装具体的协议细节,屏蔽了网络通信的复杂性。
用户只需要调用信道提供的接口,发送消息或接收消息,无需关心底层数据如何传输、协议如何实现。
简单来说,用户面向的是信道服务接口,而信道背后处理了连接管理、数据编解码、协议交互等工作,实现了业务与网络通信的解耦。
客户端设计视角和服务器视角的对比
方面 | 客户端视角(信道) | 服务端视角(连接+信道) |
---|
主要关注点 | 调用信道接口完成消息操作 | 管理连接和信道,执行底层协议与业务逻辑 |
抽象层级 | 抽象出具体网络和协议细节 | 解析信道请求,实现消息路由和持久化 |
资源管理 | 不关心连接具体实现,只用信道 | 管理物理连接、信道状态及相关资源 |
多路复用 | 多信道复用同一连接,简化调用 | 维护多信道,保证隔离与并发性能 |
用户责任 | 只需调用信道接口 | 处理协议解析、消息调度、资源分配 |
- 连接模块
客户端和服务端进行网络连接的基础。
一个直接面向用户的模块,内部包含多个对外提供服务的接口,用户需要什么服务进行调用对应的接口即可,其中包含交换机的声明/删除,队列的声明与删除,队列的绑定与解绑,消息的发布与订阅,订阅和解除订阅。
表达了客户端与服务器之间在消息队列系统中协作的流程
在仿 RabbitMQ 的消息队列系统中,客户端首先通过订阅者模块注册自身的消费者身份,并指定对应的消息处理回调函数;随后通过信道模块在单一的 TCP 连接上实现多路复用,创建多个逻辑信道以并行处理不同的消息服务(如发布、订阅、队列管理等)。客户端通过连接模块建立与服务器的连接,并在信道中发起具体的请求服务。
服务器接收到连接请求后,由服务器端的连接管理器创建连接上下文,并根据信道中携带的请求类型,路由到对应的处理模块(如交换机、队列或消息模块),执行相应的业务逻辑。处理结果再由服务器的异步线程池将数据封装好并通过网络返回给客户端,客户端的异步事件机制或线程池再触发对应的回调函数完成消息消费流程。
基于以上模块实现客户端的思路就非常清晰了
1、实例化异步线程对象
2、实例化连接对象
3、通过连接对象进行创建信道
4、根据信道进行获取自己的所需服务
5、关闭信道
6、关闭连接
8.1、订阅者模块
订阅者对象的设计
一个不向用户进行直接展示的模块,在客户端中进行体现的作用就是对角色的描述,表示这就是一个消费者,用户通常不直接操作订阅逻辑,而是通过定义“消费者”的方式进行消息处理逻辑的注册。
一个信道只有一个订阅者,所以说不需要进行订阅者的管理。订阅者这个模块很简单,没有涉及到一些业务模块的内容,业务模块的服务都在信道模块进行提供。
订阅者模块(消费者)这个类中成员变量的设计
- 首先需要定义消费者ID,描述了收到该条消息后该如何进行对这条消息进行处理。
- 其次是要进行订阅的哪个队列的ID和自动删除标志,描述了收到消息后是否需要对消息进行回复,是否要进行自动删除。
- 最后是回调函数,描述了从队列中进行获取消息后应该如何进行处理,这部分由用户进行决定。
订阅者模块的实现
using ConsumerCallback = std::function<void(const std::string, const ys::BasicProperties *bp, const std::string)>;struct Consumer{using ptr=std::shared_ptr<Consumer>;std::string _cid;std::string _qname;bool _auto_ack;ConsumerCallback _callback;Consumer(){DLOG("new Consumer:%p",this);}Consumer(const std::string &cid, const std::string &qname, bool auto_ack, const ConsumerCallback &cb): _cid(cid), _qname(qname), _auto_ack(auto_ack), _callback(std::move(cb)){DLOG("new Consumer:%p",this);}~Consumer(){DLOG("del Consumer:%p",this);}};
在构造函数中,ConsumerCallback(是一个 std::function)内部可能有复杂对象(比如 Lambda、绑定的资源等),如果直接写 _callback(cb),会调用拷贝构造,可能涉及较多内存分配、资源拷贝,用 std::move(cb),可以让 ConsumerCallback 的内部资源被移动到 _callback,更高效。
8.2、异步工作线程模块
用户虽然是通过信道进行网络通信的,但是网络通信的本质还是需要进行IO事件的监控的,这就要通过IO监控线程来进行整,不能在当前线程进行IO事件的监控,这样的话就会在当前线程进行阻塞住了.下面通过表格的方式进行说明
模块 | loopthread | pool |
---|---|---|
订阅客户端 | 负责监听服务器消息推送(socket 读事件) | - 接收到消息后,异步将业务处理放到 pool 中去执行,因为收到的消息可能需要进行处理会耗时,防止主线程阻塞,无法进行监听服务器消息的推送 |
发布客户端 | 负责监听向服务器发送消息后的 socket 可写事件方便继续向服务器进行发送数据(或者响应 ack 等) | - 应用层调用发布接口时,耗时操作(如序列化、日志记录)在 pool 中处理 |
class AsyncWorker{public:using ptr=std::shared_ptr<AsyncWorker>;muduo::net::EventLoopThread loopthread;threadpool pool;};
8.3、连接模块
这其实就是我们在Demo模块利用muduo库进行搭建的客户端,这里其实就是换了一层皮,称为连接模块。
class Connection{public:using ptr=std::shared_ptr<Connection>;typedef std::shared_ptr<google::protobuf::Message> MessagePtr;Connection(const std::string &sip, int sport,AsyncWorker::ptr worker): _latch(1), _client(worker->loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"), _dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_worker(worker),_channel_manager(std::make_shared<ChannelManager>()){_dispatcher.registerMessageCallback<ys::basicConsumeResponse>(std::bind(&Connection::consumeResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::basicCommonResponse>(std::bind(&Connection::basicResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));_client.connect();_latch.wait(); // 阻塞等待,直到连接建立成功}Channel::ptr openChannel() {Channel::ptr channel = _channel_manager->create(_conn,_codec);bool ret=channel->opneChannel();if(ret==false){DLOG("打开信道失败");return Channel::ptr();}return channel;}void closeChannel(const Channel::ptr& channel){channel->closeChannel();_channel_manager->remove(channel->cid());}private://收到基础响应void basicResponse(const muduo::net::TcpConnectionPtr &conn, const basicCommonResponsePtr &message, muduo::Timestamp){//1、找到信道Channel::ptr channel=_channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道消息");return;}//2、将得到的响应对象进行添加到信道的基础响应channel->putBasicResponse(message);}//收到消息推送void consumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr &message, muduo::Timestamp){//1、找到信道Channel::ptr channel=_channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道消息");return;}//2、封装异步任务(消息处理任务),抛入线程池_worker->pool.push([channel,message](){channel->consume(message);});}void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_latch.countDown(); // 唤醒主线程中的阻塞_conn = conn;}else{// 连接关闭时的操作_conn.reset();}}private:muduo::CountDownLatch _latch; // 实现同步的//muduo::net::EventLoopThread _loopthread; // 异步循环处理线程AsyncWorker::ptr _worker;muduo::net::TcpConnectionPtr _conn; // 客户端对应的连接muduo::net::TcpClient _client; // 客户端ProtobufDispatcher _dispatcher; // 请求分发器ProtobufCodecPtr _codec; // 协议处理器ChannelManager::ptr _channel_manager;};
在连接模块这里是有一个极易容易进行掉进坑里的陷阱 :当发布客户端进行向服务器进行发送建立连接请求的时候,由于TCP是有发送缓冲区和接收缓冲区的,当请求被发送到发送缓存区的时候,就会默认连接建立成功,但是此时的连接是没有被建立成功的,此时发布客户端误以为连接是建立成功的,就会执行后续操作,向服务器进行发送消息,此时就出现了问题。同样订阅客户端也类似。
因此在onConnection 函数中需要进行判断是否是真正的建立连接成功。
consumeResponse中
当连接收到消息推送后,需要_consumer 进行参与,因为只有consumer中有回调函数,知道进行收到消息推送时如何进行处理,这个接口到时候收到消息之后和消息一起进行封装成一个任务,把这个任务放到线程池中,并不在当前的主执行流中进行执行。
8.4、信道管理模块
信道模块的定位与主要职责
信道不仅仅是数据的通道,还承载着客户端的业务接口,因此这个模块不仅要进行信道结构的设计,还需要进行提供对应的业务逻辑。信道类可以理解为客户端在和消息服务器交互时的一条逻辑通道。它并不是单纯的数据结构,而是抽象出与服务器交互(各种请求/响应、状态维护等)的一套完整业务流程封装。
换句话说,其实可以将信道模块进行理解成将订阅者模块、异步线程模块、和连接模块进行统一封装管理。
客户端信道模块和客户端其他模块之间的交互关系
模块 | 交互内容 |
---|---|
连接模块 ( muduo::net::TcpConnection ) | - Channel 直接持有 TCP 连接 _conn - 通过 _conn 发送请求消息给服务器- 服务器响应也通过该连接返回 |
订阅者模块 ( Consumer ) | - Channel 通过 basicConsume() 创建订阅者对象 _consumer - 收到推送消息时, Channel::consume() 回调订阅者的处理逻辑 |
异步线程模块 (muduo 的 IO 线程) | - 服务器响应由 IO 线程收到 - 触发 Channel::putBasicResponse() ,将响应加入 _basic_resp - 触发 Channel::consume() ,调用用户回调 |
8.2.1、信道管理的信息
- 信道ID
- 信道关联的网络通信连接的对象
- protobuf 协议处理对象
- 信道关联的消费者
- 请求对应的响应信息队列(这里队列使用<请求ID,响应>hash表,一遍进行查找指定的响应)
- 互斥锁&条件变量(大部分的请求都是阻塞操作,发送请求后需要进行等到响应才能继续,但是muduo库的通信是异步的,因此需要我们子啊收到响应后,通过判断是否是等待的响应来进行同步)。
8.2.2、信道管理的操作
- 创建信道的操作
- 提供删除信道操作
- 提供声明交换机操作(强断言-有则OK,没有则创建)
- 提供删除交换机
- 提供创建队列操作(强断言-有则OK,没有则创建)
- 提供删除队列操作
- 提供交换机-队列绑定操作
- 提供交换机-队列解除绑定操作
- 提供添加订阅操作
- 提供取消订阅操作
- 提供发布消息操作
- 提供确认消息操作信道模块进行管理
using ProtobufCodecPtr=std::shared_ptr<ProtobufCodec>;using basicCommonResponsePtr=std::shared_ptr<ys::basicCommonResponse>;using basicConsumeResponsePtr=std::shared_ptr<ys::basicConsumeResponse>;class Channel{public:using ptr=std::shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec):_cid(UUIDHelper::uuid()),_conn(conn),_codec(codec){}~Channel(){basicCancel();}bool opneChannel(){std::string rid=UUIDHelper::uuid();ys::openChannelRequest req;req.set_rid(rid);req.set_cid(_cid);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void closeChannel(){std::string rid=UUIDHelper::uuid();ys::closeChannelRequest req;req.set_rid(rid);req.set_cid(_cid);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器waitResponse(rid);//返回return;}bool declareExchange(const std::string& name,ys::ExchangeType type,bool durable,bool auto_delete,google::protobuf::Map<std::string,std::string>& args){//构造一个声明虚拟机的请求对象ys::declareExchangeRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);req.set_exchange_type(type);req.set_durable(durable);req.set_auto_delete(auto_delete);req.mutable_args()->swap(args);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void deleteExchange(const std::string& name){ys::deleteExchangeRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);return;}bool declareQueue(const std::string& qname,bool qdurable,bool qexclusive,bool qauto_delete,google::protobuf::Map<std::string,std::string> qargs){ys::declareQueueRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);req.set_durable(qdurable);req.set_exclusive(qexclusive);req.set_auto_delete(qauto_delete);req.mutable_args()->swap(qargs);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void deleteQueue(const std::string& qname){ys::deleteQueueRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器waitResponse(rid);//返回return;}bool queueBind(const std::string& ename,const std::string& qname,const std::string& key){ys::queueBindRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);req.set_binding_key(key);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void queueUnBind(const std::string& ename,const std::string& qname){ys::queueUnBindRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器waitResponse(rid);//返回return;}bool basicPublish(const std::string &ename, ys::BasicProperties *bp, const std::string &body){std::string rid=UUIDHelper::uuid();ys::basicPublishRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_body(body);req.set_exchange_name(ename);if(bp!=nullptr){req.mutable_properties()->set_id(bp->id());req.mutable_properties()->set_delivery_mode(bp->delivery_mode());req.mutable_properties()->set_routing_key(bp->routing_key());}//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void basicAck(const std::string &msgid)//删除参数qname,用户知道消费了哪个队列{if(_consumer.get()==nullptr){DLOG("消息确认时,找不到消费者的消息");return;}std::string rid=UUIDHelper::uuid();ys::basicAckRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(_consumer->_qname);req.set_message_id(msgid);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器waitResponse(rid);//返回return;}bool basicConsume(const std::string consume_tag,const std::string queue_name,bool auto_ack,const ConsumerCallback &cb){if(_consumer.get()!=nullptr){DLOG("当前信道已订阅其他队列消息");return false;}std::string rid=UUIDHelper::uuid();ys::basicConsumeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_consume_tag(consume_tag);req.set_queue_name(queue_name);req.set_auto_ack(auto_ack);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);if(resp->ok()==false){DLOG("添加订阅失败!");return false;}_consumer=std::make_shared<Consumer>(consume_tag,queue_name,auto_ack,cb);//返回return resp->ok();}void basicCancel(){//不一定是消费者if(_consumer.get()==nullptr){DLOG("取消订阅时,找不到消费者信息");return;}std::string rid=UUIDHelper::uuid();ys::basicCancelRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_consume_tag(_consumer->_cid);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器waitResponse(rid);_consumer.reset();//返回return;}std::string cid(){return _cid;}public://连接收到基础响应后,向hash_map中进行添加响应void putBasicResponse(const basicCommonResponsePtr& resp){std::unique_lock<std::mutex> lock(_mutex);_basic_resp.insert(std::make_pair(resp->rid(),resp));_cv.notify_all();}//连接收到消息推送后,需要通过信道进行找到对应的消费对象,通过回调函数进行消息处理void consume(const basicConsumeResponsePtr& resp){if(_consumer.get()==nullptr){DLOG("消息处理时,未找到订阅者消息");return;}if(_consumer->_cid!=resp->consume_tag()){DLOG("收到的推送消息中的消费者标识,与当前信道的消费者标识不一致!");return;}_consumer->_callback(resp->consume_tag(),resp->mutable_properties(),resp->body());return;}private://等待请求的响应basicCommonResponsePtr waitResponse(const std::string& rid){std::unique_lock<std::mutex> lock(_mutex);//while(condition()) _cv.wait();_cv.wait(lock,[&rid,this](){return _basic_resp.find(rid)!=_basic_resp.end();});basicCommonResponsePtr basic_resp=_basic_resp[rid];_basic_resp.erase(rid);return basic_resp;}private:std::string _cid;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;Consumer::ptr _consumer;std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string,basicCommonResponsePtr> _basic_resp;};
该模块的注意事项
_consumer 这个成员是不需要在构造函数时进行初始化,当前这个信道要进行订阅某个消息的时候,才能确定这个角色是一个消费者角色,此时在进去构建,要是再构造函数的过程中就进行去构建,万一这个信道的角色是发布客户端,就造成了资源的浪费
在移除信道的时候要是消费者需要进行取消订阅一下,因此添加一个析构函数
8.2.4、对提供创建信道操作
信道的增删查
class ChannelManager{public:using ptr=std::shared_ptr<ChannelManager>;ChannelManager(){}Channel::ptr create(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec){std::unique_lock<std::mutex> lock(_mutex);auto channel =std::make_shared<Channel>(conn,codec);_channels.insert(std::make_pair(channel->cid(),channel));return channel;}void remove(const std::string& cid){//进行删除的时候还需要进行考虑是否为消费者(是需要进行取消订阅)std::unique_lock<std::mutex> lock(_mutex);_channels.erase(cid);}Channel::ptr get(const std::string& cid){auto pos=_channels.find(cid);if(pos==_channels.end()){return Channel::ptr();}return pos->second;}private:std::mutex _mutex;std::unordered_map<std::string,Channel::ptr> _channels;};
九、功能联调
9.1、联调的思想
- 必须有一个生产者客户端
- 声明一个交换机exchange1
- 声明两个队列queue1(其中binding_key=queue1)、queue2(binding_key=new.music.#)
- 将这两个队列和交换机进行绑定起来
- 搭建两个消费者客户端,分别进行各自订阅一个队列的消息
- 第一次,将交换机的类型进行定义为广播交换模式:理论结果是两个消费者客户端都能拿到消息
- 第二次,将交换机的类型进行定义为直接交换模式:routing_key=queue1,理论是只有queue1能拿到消息
- 第三次,将交换机的类型进行定义成主题交换模式:routing_key=news.music.pop,理论是只有queue2能拿到结果
9.2、搭建发布客户端
以广播模式下的测试为例子
int main()
{//广播交换下进行测试//直接交换下进行测试//主题交换下进行测试//1、实例化异步工作线程对象brush::AsyncWorker::ptr awp=std::make_shared<brush::AsyncWorker>();//2、实例化连接对象brush::Connection::ptr conn=std::make_shared<brush::Connection>("127.0.0.1",8085,awp);//3、通过连接进行创建信道brush::Channel::ptr channel=conn->openChannel();//4、通过信道提供的服务完成所需//4.1、声明一个交换机exchange1,交换机的类型为广播模式google::protobuf::Map<std::string, std::string> temp_args;channel->declareExchange("exchange1",ys::TOPIC,true,false,temp_args);//4.2、声明两个队列queue1和queue2channel->declareQueue("queue1",true,false,false,temp_args);channel->declareQueue("queue2",true,false,false,temp_args);//4.3、绑定queue1-exchange1,且binding_key设置成queue1// 绑定queue2-exchange1,且binding_key设置成news.music.# channel->queueBind("exchange1","queue1","queue1");channel->queueBind("exchange1","queue2","news.music.#");//5、循环向交换机进行发布信息//广播交换// for(int i=1;i<10;i++)// {// std::string msg="Hello world-"+std::to_string(i);// channel->basicPublish("exchange1",nullptr,msg);// DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());// }//直接交换// for(int i=0;i<10;i++)// {// ys::BasicProperties bp;// bp.set_id(brush::UUIDHelper::uuid());// bp.set_delivery_mode(ys::DeliveryMode::DURABLE);// bp.set_routing_key("queue1");// std::string msg="Hello world-"+std::to_string(i);// channel->basicPublish("exchange1",&bp,msg);// DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());// }//主题交换for(int i=0;i<10;i++){ys::BasicProperties bp;bp.set_id(brush::UUIDHelper::uuid());bp.set_delivery_mode(ys::DeliveryMode::DURABLE);bp.set_routing_key("news.music.pop");std::string msg="Hello world-"+std::to_string(i);channel->basicPublish("exchange1",&bp,msg);DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());}ys::BasicProperties bp;bp.set_id(brush::UUIDHelper::uuid());bp.set_delivery_mode(ys::DeliveryMode::DURABLE);bp.set_routing_key("news.music.sport");std::string msg="Hello brush-";channel->basicPublish("exchange1",&bp,msg);DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());//6、关闭信道//channel->closeChannel();conn->closeChannel(channel);
}
9.3、搭建消费者客户端
同样是以广播模式下的测试为例子
//需要进行增加传参
void cb(brush::Channel::ptr& channel, const std::string& consumer_tag, const ys::BasicProperties *bp, const std::string&body)
{DLOG("%s - 消费了消息: %s",consumer_tag.c_str(),body.c_str());std::cout<<"body:"<<body<<std::endl;channel->basicAck(bp->id());
}
int main(int argc,char*argv[])
{if(argc!=2){DLOG("usage: ./consumer_client queue1");return -1;}//1、实例化异步工作线程对象brush::AsyncWorker::ptr awp=std::make_shared<brush::AsyncWorker>();//2、实例化连接对象brush::Connection::ptr conn=std::make_shared<brush::Connection>("127.0.0.1",8085,awp);//DLOG("实例化连接成功");//3、通过连接进行创建信道brush::Channel::ptr channel=conn->openChannel();//DLOG("打开信道成功");//4、通过信道提供的服务完成所需//4.1、声明一个交换机exchange1,交换机的类型为广播模式google::protobuf::Map<std::string, std::string> temp_args;channel->declareExchange("exchange1",ys::TOPIC,true,false,temp_args);//4.2、声明两个队列queue1和queue2channel->declareQueue("queue1",true,false,false,temp_args);channel->declareQueue("queue2",true,false,false,temp_args);//4.3、绑定queue1-exchange1,且binding_key设置成queue1// 绑定queue2-exchange1,且binding_key设置成news.music.# channel->queueBind("exchange1","queue1","queue1");channel->queueBind("exchange1","queue2","news.music.#");//5、进行订阅队列的消息(回调函数对消息进行确认)//auto functor=std::bind(cb,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel->basicConsume("consumer1",argv[1],false,functor);while(1) std::this_thread::sleep_for(std::chrono::seconds(3));//6、关闭信道conn->closeChannel(channel);return 0;
}
十、项目的扩展
- 我们项目中只实现了一个虚拟机的版本,实际上是可以有多个虚拟机的
- 我们是通过代码进行搭建客户端进行访问服务器的,可以进行拓展成管理接口,然后通过可视化的界面进行客户端的搭建
- 交换机/队列的独占模式和自动删除
- 发送方式的确认(broker 给生产者进行确认应答)功能也可以进行拓展实现
相关文章:
高性能分布式消息队列系统(四)
八、客户端模块的实现 客户端实现的总体框架 在 RabbitMQ 中,应用层提供消息服务的核心实体是 信道(Channel)。 用户想要与消息队列服务器交互时,通常不会直接操作底层的 TCP 连接,而是通过信道来进行各种消息的发布…...
C#异步编程:从线程到Task的进化之路
一、没有异步编程之前的时候 在异步编程出现之前,程序主要采用同步编程模型。这种模型下,所有操作按顺序执行,当一个操作(如I/O读写、网络请求)阻塞时,整个程序会被挂起,导致资源利用率低和响应延迟高。具体问题包括: 阻塞执行:同步代码在执行耗时操作时(如文件读取…...
[论文阅读] 人工智能+软件工程 | 用大模型优化软件性能
用大模型优化软件性能?这篇论文让代码跑出新速度! arXiv:2506.01249 SysLLMatic: Large Language Models are Software System Optimizers Huiyun Peng, Arjun Gupte, Ryan Hasler, Nicholas John Eliopoulos, Chien-Chou Ho, Rishi Mantri, Leo Deng, K…...

复变函数中的对数函数及其MATLAB演示
复变函数中的对数函数及其MATLAB演示 引言 在实变函数中,对数函数 ln x \ln x lnx定义在正实数集上,是一个相对简单的概念。然而,当我们进入复变函数领域时,对数函数展现出更加丰富和复杂的性质。本文将介绍复变函数中对数函…...

【Linux】Linux程序地址基础
参考博客:https://blog.csdn.net/sjsjnsjnn/article/details/125533127 一、地址空间的阐述 1.1 程序地址空间 下面的图片展示了程序地址空间的组成结构 我们通过代码来验证一下 int g_unval; int g_val 100;int main(int argc, char *argv[]);void test1() {i…...
React 项目初始化与搭建指南
React 项目初始化有多种方式,可以选择已有的脚手架工具快速创建项目,也可以自定义项目结构并使用构建工具实现项目的构建打包流程。 1. 脚手架方案 1.1. Vite 通过 Vite 创建 React 项目非常简单,只需一行命令即可完成。Vite 的工程初始化…...

将图形可视化工具的 Python 脚本打包为 Windows 应用程序
前文我们已经写了一个基于python的tkinter库和matplotlib库的图形可视化工具。 基于Python的tkinter库的图形可视化工具(15种图形的完整代码):基于Python的tkinter库的图形可视化工具(15种图形的完整代码)-CSDN博客 在前文基础上&…...
AWS DocumentDB vs MongoDB:数据库的技术抉择
随着非关系型数据库在现代应用中的广泛应用,文档型数据库因其灵活的结构与出色的扩展性,逐渐成为企业开发与架构设计中的核心选择。在众多文档数据库中,MongoDB 凭借其成熟生态与社区支持占据主导地位;与此同时,AWS 提…...

无人机军用与民用技术对比分析
一、材料区别 军用无人机: 1. 高强度特种材料: 大量使用钛合金、碳纤维复合材料,兼顾轻量化与高强度,提升抗冲击性和隐身性能。 关键部件依赖进口材料。 2. 隐身涂层: 采用雷达吸波材料和低红外特征涂料…...

刷leetcode hot100--矩阵6/1
1.螺旋矩阵【很久】6/1【感觉就是思路的搬运工,没完全理解】 54. 螺旋矩阵 - 力扣(LeetCode) 原来想 但是如果是奇数矩阵,遍历不到中间 解决思路: 用left,right,top,down标记/限定每次遍历的元素,每次从…...
Qt 中实现文本截断(ellipsis)的功能。Qt 提供了此方法来处理过长的文本显示问题,例如在界面中限制文本长度并添加省略号(...)
QElidedText 并不是 Qt 中的标准类名或功能名称,但根据你的描述,你可能是指 QFontMetrics::elidedText() 方法。这是一个用于在 Qt 中实现文本截断(ellipsis)的功能。Qt 提供了此方法来处理过长的文本显示问题,例如在界…...
Cisco IOS XE WLC 任意文件上传漏洞复现(CVE-2025-20188)
免责申明: 本文所描述的漏洞及其复现步骤仅供网络安全研究与教育目的使用。任何人不得将本文提供的信息用于非法目的或未经授权的系统测试。作者不对任何由于使用本文信息而导致的直接或间接损害承担责任。如涉及侵权,请及时与我们联系,我们将尽快处理并删除相关内容。 前…...
基于ResNet残差网络优化梯度下降算法实现图像分类
文章目录 题 目: 基于ResNet残差网络优化梯度下降算法实现图像分类基于ResNet残差神经网络优化梯度下降算法实现海贼王图像分类引言1.ResNet残差神经网络介绍1.1 ResNet残差神经网络的研究现状1.2 ResNet残差神经网络的原理1.3 ResNet残差神经网络的实现步骤1.3.1导入必要的库…...
群晖NAS套件历史版本资源
有时候需要下载历史的群晖套件,可以通过以下地址前往 Synology Archive Download Site - Index of /download 该页面汇集了各类群晖应用程序的过往版本,方便用户根据需要选择特定版本的软件进行下载安装。这种方式适用于需要旧版软件兼容性或进行版本回…...

Docker轻松搭建Neo4j+APOC环境
Docker轻松搭建Neo4jAPOC环境 一、简介二、Docker部署neo4j三、Docker安装APOC插件四、删除数据库/切换数据库 一、简介 Neo4j 是一款高性能的 原生图数据库,采用 属性图模型 存储数据,支持 Cypher查询语言,适用于复杂关系数据的存储和分析。…...

定制开发开源AI智能名片S2B2C商城小程序在无界零售中的应用与行业智能升级示范研究
摘要:本文聚焦无界零售背景下京东从零售产品提供者向零售基础设施提供者的转变,探讨定制开发开源AI智能名片S2B2C商城小程序在这一转变中的应用。通过分析该小程序在商业运营成本降低、效率提升、用户体验优化等方面的作用,以及其与京东AI和冯…...
CppCon 2015 学习:CLANG/C2 for Windows
Visual Studio 2015 引入了基于 CLANG/LLVM 的新代码生成器,及其背景和意义。简单理解如下: 理解要点: VS2015 中引入了全新的代码生成技术,性能和质量都很棒。这套新技术基于 Clang,微软展示了相关新工具。Clang 和…...
Spring中@Primary注解的作用与使用
在 Spring 框架中,Primary 注解用于解决依赖注入时的歧义性(Ambiguity)问题。当 Spring 容器中存在多个相同类型的 Bean 时,通过 Primary 标记其中一个 Bean 作为默认的首选注入对象。 核心作用: 解决多个同类型 Bean …...
Spring Boot + Elasticsearch + HBase 构建海量数据搜索系统
Spring Boot Elasticsearch HBase 构建海量数据搜索系统 📖 目录 1. 系统需求分析2. 系统架构设计3. Elasticsearch 与 HBase 集成方案4. Spring Boot 项目实现5. 大规模搜索系统最佳实践 项目概述 本文档提供了基于 Spring Boot、Elasticsearch 和 HBase 构建…...
[zynq] Zynq Linux 环境下 AXI BRAM 控制器驱动方法详解(代码示例)
Zynq Linux 环境下 AXI BRAM 控制器驱动方法详解 文章目录 Zynq Linux 环境下 AXI BRAM 控制器驱动方法详解1. UIO (Userspace I/O) 驱动方法完整示例代码 2. /dev/mem 直接内存映射方法完整示例代码 3. 自定义字符设备驱动方法完整示例代码 4. 方法对比总结5. 实战建议 在 Zyn…...

【大模型:知识图谱】--5.neo4j数据库管理(cypher语法2)
目录 1.节点语法 1.1.CREATE--创建节点 1.2.MATCH--查询节点 1.3.RETURN--返回节点 1.4.WHERE--过滤节点 2.关系语法 2.1.创建关系 2.2.查询关系 3.删除语法 3.1.DELETE 删除 3.2.REMOVE 删除 4.功能补充 4.1.SET (添加属性) 4.2.NULL 值 …...
六、数据库的安全性
六、数据库的安全性 数据库的安全问题 数据库中的数据是可以共享的数据共享必然带来数据库的安全性问题 数据库系统中的数据共享不能是无条件的共享数据库中数据的共享是在 DBMS 统一的严格控制之下的共享,即:只允许有合法使用权限的用户访其被授权的数…...

贪心算法应用:装箱问题(BFD算法)详解
贪心算法应用:装箱问题(BFD算法)详解 1. 装箱问题与BFD算法概述 1.1 装箱问题定义 装箱问题(Bin Packing Problem)是组合优化中的经典问题,其定义为: 给定n个物品,每个物品有大小wᵢ (0 < wᵢ ≤ C)无限数量的箱子…...
C#学习第27天:时间和日期的处理
时间和日期的核心概念 1. UTC 和 本地时间 UTC(Coordinated Universal Time): 是一种不受时区影响的世界标准时间。在网络通信和全球协作中,用于统一时间度量 本地时间(Local Time): 是根据所…...

编程技能:格式化打印05,格式控制符
专栏导航 本节文章分别属于《Win32 学习笔记》和《MFC 学习笔记》两个专栏,故划分为两个专栏导航。读者可以自行选择前往哪个专栏。 (一)WIn32 专栏导航 上一篇:编程技能:格式化打印04,sprintf 回到目录…...

MPLAB X IDE 软件安装与卸载
1、下载MPLAB X IDE V6.25 MPLAB X IDE | Microchip Technology 正常选Windows,点击Download,等待自动下载完成; MPLAB X IDE 一台电脑上可以安装多个版本; 2、安装MPLAB X IDE V6.25 右键以管理员运行;next; 勾选 I a…...

windows编程实现文件拷贝
项目源码链接: 实现文件拷贝功能(限制5GB大小) 81c57de 周不才/cpp_linux study - Gitee.com 知识准备: 1.句柄 句柄是一个用于标识和引用系统资源(如文件、窗口、进程、线程、位图等)的值。它不是资…...

[6-01-01].第12节:字节码文件内容 - 属性表集合
JVM学习大纲 二、属性表集合: 2.1.属性计数器: 2.2.属性表: 2.3.字节码文件组成5 -> 属性: 1.属性主要指的是类的属性,比如源码的文件名、内部类的列表等 2.4.字节码文件组成3 -> 字段: 1.字段中…...

基于机器学习的水量智能调度研究
摘要:随着城市化进程的加速和水资源供需矛盾的日益突出,传统的水量调度模式因缺乏精准预测和动态调控能力,难以满足现代供水系统对高效性、稳定性和节能性的要求。本文针对供水系统中用水峰谷预测不准确、能耗高、供需失衡等核心问题…...
深度解码:我如何用“结构进化型交互学习方法”与AI共舞,从学习小白到构建复杂认知体系
嗨,亲爱的学习者们,思考者们,以及所有渴望在知识海洋中自由翱行却时常感到迷茫的朋友们: 你是否也曾有过这样的深夜,面对堆积如山的学习资料,眼神迷离,内心却一片荒芜?明明每个字都…...