rabbitMq------客户端模块
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言
- 消费者模块
- 信道管理模块
- 管理的字段
- 提供的接口
- 信道内存管理
- 连接管理类
前言
在RabbitMQ中,提供服务的是信道,因此在客⼾端的实现中,弱化了Client客⼾端的概念。在RabbitMQ中并不会向⽤⼾展⽰⽹络通信的概念出来,⽽是以⼀种提供服务的形式来体现。实现思想类似于普通的功能接⼝封装,⼀个接⼝实现⼀个功能,接⼝内部完成向客⼾端请求的过程,但是对外并不需要体现出客⼾端与服务端通信的概念,⽤⼾需要什么服务就调⽤什么接⼝就⾏。
消费者模块
客户端这块对于消费者模块是不需要管理的,当进行消息订阅的时候,就会创建出一个消费者,而这消费者的作用就是:
• 描述当前信道订阅了哪个队列的消息。
• 描述了收到消息后该如何对这条消息进⾏处理。
• 描述收到消息后是否需要进⾏确认回复。
using ConsumerCallBack = std::function<void(const std::string&,BasicProperties *,const std::string &)>;struct Consumer{using ptr = std::shared_ptr<Consumer>;std::string _qname; //消费者订阅的队列名称std::string _ctag; //消费者标识bool _auto_ack; //自动应答标志ConsumerCallBack _cb; }
信道管理模块
同样的客户端也有信道,其功能与服务端几乎一致,只不过客户端的信道是为用户提供服务的,而服务器的信道是为客户端的请求提供服务的。也可以理解是⽤⼾通过客⼾端channel的接⼝调⽤来向服务端发送对应请求,获取请求的服务。
管理的字段
客户端的信道需要管理的字段中有一个哈希表,是请求id和通用响应结构的映射。
因为在muduo库中发送和接收都是异步的,例如我们声明一个交换机,这个请求可能还在发送缓冲区,并没有发送,我们此时如果去给这个交换机推送消息就会出问题。因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步。
class Channel{private:std::string _cid;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec; Consumer::ptr _consumer;//由于muduo库的发送和接收都是异步的,例如我们声明一个交换机,这个请求可能还在发送缓冲区,并没有发送,我们此时如果去给这个交换机推送消息就会出问题。因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string,basicCommonResponsePtr> _basic_resp;}
提供的接口
客户端的channel和服务端的接口都是几乎一致的,客户端的接口中是组织请求,向服务端发起请求,服务端的接口是接收请求进行业务处理。
但客户端中的channel提供了打开信道和关闭信道这俩个接口,他们是向服务端发起请求,在服务器上创建信道。
//这两个接口是向服务端发送请求,在服务端创建对应信道bool openChannel(){//组织请求openChannelRequest req;std::string rid = UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);//发送_codec->send(_conn,req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void closeChannel(){//组织请求closeChannelRequest req;std::string rid = UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);//发送_codec->send(_conn,req);waitResponse(rid);return;}
其中每个接口中都需要调用waitResponse,用来同步等待响应。
basicCommonResponsePtr waitResponse(std::string &rid){std::unique_lock<std::mutex> lock(_mutex);_cv.wait(lock,[&rid,this](){return _basic_resp.find(rid) != _basic_resp.end();});basicCommonResponsePtr basic_resp = _basic_resp[rid];_basic_resp.erase(rid);return basic_resp;}
信道内存管理
对信道的一个总的管理类。
有一个哈希表,是信道ID和信道对象的映射。
他提供了三个接口,创建信道/删除信道/获取指定信道。
class ChannelManager{private:std::mutex _mutex;std::unordered_map<std::string,Channel::ptr> _channels;}
连接管理类
在客⼾端这边,RabbitMQ弱化了客⼾端的概念,因为⽤⼾所需的服务都是通过信道来提供的,因此操作思想转换为先创建连接,通过连接创建信道,通过信道提供服务这⼀流程。
这个模块同样是针对muduo库客⼾端连接的⼆次封装,向⽤⼾提供创建channel信道的接⼝,创建信道后,可以通过信道来获取指定服务。
class Connection {public:using ptr = std::shared_ptr<Connection>;
private:muduo::CountDownLatch _latch;//实现同步的muduo::net::TcpConnectionPtr _conn;//客户端对应的连接muduo::net::TcpClient _client;//客户端ProtobufDispatcher _dispatcher;//请求分发器ProtobufCodecPtr _codec;//协议处理器AsyncWorker::ptr _worker;ChannelManager::ptr _channel_manager;}
在客户端这边,会收到两种响应,一种是基础响应,一种是消息推送响应。
我们需要注册对于这两种类型响应的回调函数。
基础响应,在基础响应中有一个cid字段,根据cid获取指定的信道对象,
调用信道对象中的putBasicResponse接口。
也就是往信道的hashMap中添加响应值。
void basicResponse(const muduo::net::TcpConnectionPtr& conn, const basicCommonResponsePtr& message, muduo::Timestamp) {//1. 找到信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel.get() == nullptr) {DLOG("未找到信道信息!");return;}//2. 将得到的响应对象,添加到信道的基础响应hash_map中channel->putBasicResponse(message);
}
消息推送,在收到消息推送的响应后,需要更具响应中的rid,获取指定的信道对象,然后封装一个任务,这个任务就是调用信道中的consume接口,这个接口就是执行消费者中设置的回调函数。我们把这个任务放入到线程池中。
void consumeResponse(const muduo::net::TcpConnectionPtr& conn, const basicConsumerResponsePtr& message, muduo::Timestamp){
//1. 找到信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel.get() == nullptr) {DLOG("未找到信道信息!");return;}//2. 封装异步任务(消息处理任务),抛入线程池_worker->pool.push([channel, message](){channel->consume(message);});
}
连接管理提供了两个接口,打开信道和关闭信道。这是给用户提供的,用来创建一个信道,通过信道进行服务的调用。
Channel::ptr openChannel() {Channel::ptr channel = _channel_manager->create(_conn, _codec);bool ret = channel->openChannel();if (ret == false) {DLOG("打开信道失败!");return Channel::ptr();}return channel;
}void closeChannel(const Channel::ptr &channel) {channel->closeChannel();_channel_manager->remove(channel->cid());
}
相关文章:
rabbitMq------客户端模块
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言消费者模块信道管理模块管理的字段提供的接口 信道内存管理连接管理类 前言 在RabbitMQ中,提供服务的是信道,因此在客⼾端的实现中&…...

地理定位营销与开源AI智能名片O2O商城小程序的融合与发展
摘要:本文阐述地理定位营销的概念、手段及其在商业中的应用,探讨开源AI智能名片O2O商城小程序如何与地理定位营销相结合,为企业营销带来新的机遇与挑战。 一、引言 在当今数字化营销的时代,地理定位营销已成为一种重要的营销手段…...
解决Vue应用中遇到路由刷新后出现 404 错误
解释: Vue 应用中遇到路由刷新后出现 404 错误,通常是因为 Vue 应用是个单页应用(SPA),它通过 Vue Router 管理路由,通过 HTML5 History Mode 实现页面导航无需重新加载页面。当直接访问非首页的路由或者刷…...
在window10下使用directml加速phi-3模型的一些记录
1.安装anaconda,安装python 安装torch等参考网上资料非常多 不细描述 2.参考微软官网【在windows上通过DirectML启用Pytorch文档,检查系统版本 检查gpu版本 3.参考微软官网【在windows上通过DirectML启用Pytorch】文档,安装torch_directml模…...

通信工程学习:什么是OSPF开放式最短路径优先
OSPF:开放式最短路径优先 OSPF(Open Shortest Path First,开放式最短路径优先)是一种内部网关协议(IGP),被广泛应用于计算机网络中,特别是在构建大型和复杂的网络时。以下是对OSPF的…...

《中国电子报》报道: 安宝特AR为产线作业者的“秘密武器
近日,中国电子报在其文章《下一代工业智能终端重新定义制造业》中对安宝特的增强现实(AR)解决方案给予了高度评价,称其为产线作业者的“秘密武器”。这一创新技术改变了传统制造业的作业方式,使得操作人员能够在生产过…...

【Android】Handler消息机制
文章目录 前言概述核心组件概述Android消息机制概述 Android消息机制分析ThreadLocal的工作原理ThreadLocal基础ThreadLocal实现原理 MessageQueueLooperHandler的工作原理总结 前言 本文用于记录Android的消息机制,主要是指Handler的运行机制。部分内容参考自《An…...
大数据必懂知识点:Parquet、ORC还是Avro作为数据存储格式,哪种在性能和压缩率上更优
目录 第一章 相关理论 1.1 Parquet格式介绍 1.1.1 起源与发展 1.1.2 特点与优势 1.2 ORC格式介绍 1.3 Avro格式介绍 1.3.1 跨语言支持 1.3.2 动态映射 1.3.3 丰富的数据模式 1.3.4 数据模式灵活性 第二章 种格式性能比较 2.1 读写性能对比 2.2 查询性能对比 2.3 压…...
P1387 最大正方形
题目描述 在一个nm 的只包含 0 和 1 的矩阵里找出一个不包含 0 的最大正方形,输出边长。 输入格式 输入文件第一行为两个整数n,m(1≤n,m≤100),接下来 n 行,每行 m 个数字,用空格隔开,0 或 1。 输出格式 一个整数…...
Python知识点:如何使用Multiprocessing进行并行任务管理
开篇,先说一个好消息,截止到2025年1月1日前,翻到文末找到我,赠送定制版的开题报告和任务书,先到先得!过期不候! 如何在Python中使用Multiprocessing进行并行任务管理 在现代编程中,…...
React常见优化问题
在React开发中,性能优化是一个重要且持续的过程,旨在提升应用的响应速度和用户体验。以下是一些常见的React优化问题详解,并附上相应的代码示例。 1. 避免不必要的组件渲染 React组件的渲染是由其props或state的变化触发的。但是,…...

css 简单网页布局——浮动(一)
1. 三种布局方式 1.1 标准流 1.2 浮动的使用 1.3 简述浮动 1.3.1 浮动三大特性 <style>.out {border: 1px red solid;width: 1000px;height: 500px;}.one {background-color: aquamarine;width: 200px;height: 100px;}.two {background-color: blueviolet;width: 200px;h…...
设计模式(3)builder
需求: 对于复杂的对象,我们只需要 通过 设置一些参数,就可以得到相对应的 实例。 简单来说, 需求就是用一个类 通过方法返回一个 新建的对象,而且可以通过方法去设置这个对象 public interface CarBuilder {void se…...

Day01-MySQL数据库介绍及部署
Day01-MySQL数据库介绍及部署 1、数据库服务概述介绍1.1 企业中为什么需要数据库?1.2 数据库服务作用1.3 数据库服务分类 2、数据库服务安装部署2.1 数据库版本应用2.2 数据库服务程序下载2.3 数据库软件安装方式2.3.1 二进制安装步骤 3、数据库服务初始化介绍3.1 安…...

分享一个餐饮连锁店点餐系统 餐馆食材采购系统Java、python、php三个版本(源码、调试、LW、开题、PPT)
💕💕作者:计算机源码社 💕💕个人简介:本人 八年开发经验,擅长Java、Python、PHP、.NET、Node.js、Android、微信小程序、爬虫、大数据、机器学习等,大家有这一块的问题可以一起交流&…...

解决跨域问题
第一种 让后端解决 第二种 通过代理来解决 首先可以先搭建后端接口 解决则参照vue-cli官网 首先新建一个vue.config.js文件 然后在项目的根目录新建两个文件夹 开发环境和生产环境 然后可以使用环境变量 系统会自动识别你是生产环境还是开发环境 然后在封装的axios中配…...
面试知识储备-多线程
1.线程的概念 线程使得在一个程序中可以同时执行多个任务。在 Java 应用程序中,多个线程可以同时运行,例如一个线程可以处理用户输入,另一个线程可以进行后台数据处理。 2.创建线程的方式 (1)重写thread类中的run方法…...

边缘计算插上AI的翅膀会咋样?
人工智能(Artificial Intelligence,AI)是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门新的技术科学,是新一轮产业革命的重要驱动力量。2022年底发布的ChatGPT将人工智能技术上升到了一个新的高度。如今&#x…...

脉冲神经网络(SNN)论文阅读(六)-----ECCV-2024 脉冲驱动的SNN目标检测框架:SpikeYOLO
原文链接:CSDN-脉冲神经网络(SNN)论文阅读(六)-----ECCV-2024 脉冲驱动的SNN目标检测框架:SpikeYOLO Integer-Valued Training and Spike-Driven Inference Spiking Neural Network for High-performance …...
周报_2024/10/6
周报 时间 2024/9/30——2024/10/6 科研进展 写项目标书 实验了不同比例的标签加噪,模型效果随着标签加噪比例增加下降明显 下周计划 构造概念漂移数据集 借鉴其他文章中应对标签加噪的做法...

第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...
Frozen-Flask :将 Flask 应用“冻结”为静态文件
Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是:将一个 Flask Web 应用生成成纯静态 HTML 文件,从而可以部署到静态网站托管服务上,如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...

SpringTask-03.入门案例
一.入门案例 启动类: package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...

Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...