当前位置: 首页 > news >正文

网页端HTML使用MQTTJs订阅RabbitMQ数据

  最近在做一个公司的日志组件时有一个问题难住了我。今天问题终于解决了。由于在解决问题中,在网上也查了很多资料都没有一个完整的实例可以参考。所以本着无私分享的目的记录一下完整的解决过程和实例。

  需求:做一个统一日志系统可以查看日志列表和一个可以订阅最新日志的页面。通过提供一个封装好日志记录方法的sdk文件将日志统一收集。

  通过上面的需求进行我们使用RabbitMQ+Mongodb来实现系统。

  使用C#封装一个SDK大家都会这里就不说了。C#连接RabbitMQ示例代码也是一堆堆的也没什么好说的。下面重点说一下网页端如何使用JS去订阅RabbitMQ收到的最新日志信息。

  后端都是使用RabbitMQ的AMQP协议,而前端要求在网页HTML上显示数据。我们选择了使用MQTT协议从RabbitMQ中订阅数据。

  具体步骤:

1、先准备好相关JS库。MQTT有一个叫browserMqtt.js看名字就知道是为浏览器提供的JS库。还有一个封装了操作MQ的JS库 mqfactory.js。最后还要一个jquery.js文件。这样工具就准备好了。JS文件下载

2、HTML端代码。

<script type="text/javascript" src="~/js/MqJs/jquery.js"></script>
<script type="text/javascript" src="~/js/MqJs/browserMqtt.min.js"></script>
<script type="text/javascript" src="~/js/MqJs/mqfactory.js"></script>
<body><div><lable>Host: </lable><input id="txtHost" placeholder="192.168.1.88" value="10.1.0.7" /><br /><lable>Port: </lable><input id="txtPort" placeholder="15675" value="15675" /><br /><label>UserName: </label><input id="txtUserName" placeholder="username" value="admin" /><br /><label>Password: </label><input id="txtPassword" placeholder="password" value="admin" /><br /><label>Protocol: </label><input id="txtProtocol" placeholder="ws" value="ws" /><br /><input id="btnConnect" type="button" value="Connect RabbitMQ" /></div><div><input id="btnSubscribe" type="button" value="Subscribe" /><input id="btnPublish" type="button" value="Publish" /><br /><input id="btnSSHuanjing" type="button" value="Subscribe Huanjing" /><input id="hdnIsSubscribed" type="hidden" value="" /><input id="btnPubHuanjing" type="button" value="Publish Huanjing"><br />路由:<input id="btnRoutingKey" type="text" value="Dcon/Logs/Client"><br /><input id="txtMessage" type="text" placeholder="Please enter message" /></div><div><label>log:</label><br /><ul id="lstLog"></ul><input id="btnClearLog" type="button" value="Clear Log" /></div>
</body>
<script type="text/javascript">$(function () {var mqclient;//var routingKey = 'Dcon.Logs.ServerWebShow';var message;$('#btnSubscribe').attr('disabled', 'disabled');$('#btnPublish').attr('disabled', 'disabled');$('#btnSSHuanjing').attr('disabled', 'disabled');$('#btnPubHuanjing').attr('disabled', 'disabled');$('#btnConnect').click(function () {var mqttOpts = {host: (() => $('#txtHost').val())(),port: (() => $('#txtPort').val())(),username: (() => $('#txtUserName').val())(),password: (() => $('#txtPassword').val())(),//transformWsUrl方法用于在浏览器中使用MQTT的场景,默认情况下,MQTT自动生成的url为ws://ip:port形式,//然而服务器要求的格式是ws://ip:port/ws,所以MQTT提供了此接口用于在生成url时自定义url格式transformWsUrl: (url, opts, client) => { return opts.protocol && opts.protocol == 'ws' ? url + 'ws' : url; },clientId: (() => { return 'mqttjs_' + Math.random().toString(16).substr(2, 8); })()};var biz = {huanjing: function (handler, isOn) {if (isOn !== false) {this.ss(this.topics.huanjing, handler);} else {this.sus(this.topics.huanjing, handler);}},topics: {huanjing: '/hyj/huanjing/monitor'}};//系统初始化时注入连接选项mqfactory.inject(mqttOpts, biz);//创建mqclient单例 mqclient = mqfactory.create();//注册mqclient的连接成功事件mqclient.on('connect', mqconnected);});$('#btnSubscribe').click(function () {if ($(this).val() == 'Subscribe') {//订阅成功后,仅注册一次事件(要考虑每次注册事件时,事件处理器调用的次数,如果仅用一次,就用once方法)//routingKey = $("#btnRoutingKey").val();mqclient.once('onss', mqSubscribeSuccess);//简单订阅mqclient.ss($("#btnRoutingKey").val());} else {mqclient.once('onsus', mqUnsubscribeSuccess)mqclient.sus($("#btnRoutingKey").val());}});$('#btnPublish').click(function () {var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();if (message === msg) {msg = guid();}message = msg;$('#txtMessage').val(message);//发送消息mqclient.pub($("#btnRoutingKey").val(), message);$('#lstLog').append('<li>Send Message: ' + message + '</li>');});$('#btnSSHuanjing').click(function () {if ($(this).val() == 'Subscribe Huanjing') {mqclient.once('onss', mqHJSubscribeSuccess);mqclient.huanjing(onHuanjingMessageArrived);} else {mqclient.once('onsus', mqHJUnsubscribeSuccess);mqclient.huanjing(onHuanjingMessageArrived, false);}});$('#btnPubHuanjing').click(function () {var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();if (message === msg) {msg = guid();}message = msg;$('#txtMessage').val(message);//发送消息mqclient.pub(mqclient.topics.huanjing, message);$('#lstLog').append('<li>Send Huanjing Message: ' + message + '</li>');});$('#btnClearLog').click(function () {$('#lstLog').empty();});function mqconnected() {//alert("mqconnected");$('#btnSubscribe').removeAttr('disabled');$('#btnPublish').removeAttr('disabled');$('#btnSSHuanjing').removeAttr('disabled');$('#btnPubHuanjing').removeAttr('disabled');$('#lstLog').append('<li>mqclient connected</li>');}function mqSubscribeSuccess() {//订阅成功,就注册接受消息的方法,此处要接收多次,因此使用了onmqclient.on($("#btnRoutingKey").val(), onMessageArrived);$('#btnSubscribe').val('Unsubscribe');$('#lstLog').append('<li>Subscribe successful.' + $("#btnRoutingKey").val()+'</li>');}function mqUnsubscribeSuccess() {//注销订阅,所以将事件处理器解除绑定mqclient.off($("#btnRoutingKey").val(), onMessageArrived);$('#btnSubscribe').val('Subscribe');$('#lstLog').append('<li>Unsubscribe successful</li>');}function mqHJSubscribeSuccess() {$('#btnSSHuanjing').val('Unsubscribe Huanjing');$('#lstLog').append('<li>Hanjing Subscribe successful</li>');}function mqHJUnsubscribeSuccess() {$('#btnSSHuanjing').val('Subscribe Huanjing');$('#lstLog').append('<li>Huanjing Unsubscribe successful</li>');}function onMessageArrived(message) {$('#lstLog').append('<li>Receive message: ' + new Date().toString() + '    ' + message.toString() + '</li>');}function onHuanjingMessageArrived(message) {$('#lstLog').append('<li>Receive Huanjing message: ' + new Date().toString() + '    ' + message.toString() + '</li>');}function guid() {function s4() {return Math.floor((1 + Math.random()) * 0x10000).toString(16).substring(1);}return s4() + s4() + '-' + s4() + '-' + s4() + '-' +s4() + '-' + s4() + s4() + s4();}});
</script>

3.后端代码:

3.1客户端sdk代码

/// <summary>/// 写日志/// </summary>/// <param name="model"></param>public static void Write(LogModel model){//判断写入的日志级别if (model != null && model.LogLevel >= LogLevel){try{var mqMsg = new MqMessage(){MessageBody = JSON.Serialize(model),MessageRouter = SystemConst.RoutingKeyTopic.LogTopic_Producer};//MQHelper.Instance.ProducerMessage_Fanout(mqMsg);MQHelper.Instance.ProducerMessage_Topic(mqMsg);}catch (Exception ex){var errorLog = string.Format("Ip:{0},LogHelper.Write方法异常,{1}", IpHelper.LocalHostIp, ex.Message);//MQHelper.Instance.ProducerMessage_Fanout(new MqMessage() { MessageBody = errorLog });MQHelper.Instance.ProducerMessage_Topic(new MqMessage() { MessageBody = errorLog });}}}

3.2后端MQ代码:

#region 主题 交换机/// <summary>/// 生产者 客户端调用/// </summary>/// <param name="msg"></param>public void ProducerMessage_Topic(MqMessage msg){try{using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){var body = Encoding.UTF8.GetBytes(msg.MessageBody);channel.BasicPublish(exchange: SystemConst.MqName_LogMq_TopicDefault,routingKey: msg.MessageRouter,basicProperties: null,body: body);Console.WriteLine(" [x] Sent {0}", msg.MessageBody);}}}catch (Exception ex){var exMsg = ex.Message;}}/// <summary>/// 消费者 服务器接收并写入数据库/// 消费方法无法通过参数传入/// EventHandler<BasicDeliverEventArgs> received/// </summary>public void ConsumeMessage_Topic(params string[] routingKeys){if (routingKeys == null || routingKeys.Length == 0){throw new Exception("请指定接收路由");}using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){var queueName = channel.QueueDeclare().QueueName;//获得已经生成的随机队列名//对列与交换机绑定foreach (var rKey in routingKeys){channel.QueueBind(queue: queueName,exchange: SystemConst.MqName_LogMq_TopicDefault,routingKey: rKey);}var consumer = new EventingBasicConsumer(channel);//绑定消费方法consumer.Received += consomer_Received_Topic;//绑定消费者channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine("日志订阅服务启动成功.");Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}}/// <summary>/// 接收通知服务异步的推送/// </summary>/// <param name="sender"></param>/// <param name="e"></param>void consomer_Received_Topic(object sender, BasicDeliverEventArgs e){var body = e.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] {0}", message);
//这里可以增加写入数据库的代码}#endregion

3.3路由

/// <summary>/// 主题路由/// </summary>public class RoutingKeyTopic{/// <summary>/// 生产者/// </summary>public const string LogTopic_Producer = "Dcon.Logs.Client";/// <summary>/// 消息者_日志服务_保存日志/// </summary>public const string LogTopic_Consume_Server_SaveDB = "Dcon.Logs.*";/// <summary>/// 消息者_日志服务_Web显示日志/// </summary>public const string LogTopic_Consume_Server_WebShow = "Dcon.Logs#";//".Logs.Client";/// <summary>/// 消息者_日志服务_Web显示日志/// </summary>public const string LogTopic_Consume_Server_WebShow_T = "*.Logs.Client";//".Logs.Client";/// <summary>/// 消息者_日志服务_ # 接收所有/// </summary>public const string LogTopic_Consume_Server_All = "#";//".Logs.Client";}}

注意点:

1、MQTT的路由是以 / 来分割的。在RabbitMQ中会被转义成 . 如示例中的路由Dcon/Logs/Client会被转换成 Dcon.Logs.Client

2、网页端接收时的路由要和发送端的路由一至。也就是说 后端用 Dcon.Logs.Client 来推数据前端就要使用 Dcon/Logs/Client来接收数据。

3、MQTT路由不支持通配符.

4、由于MQTT的JS库没有提供Topic交换机与路由绑定功能。所以前端接收时 不能设置订阅主题交换机名称。如果要和amqp交互只能使用amqp的默认主题交换机名称 amq.topic

运行效果图:

相关文章:

网页端HTML使用MQTTJs订阅RabbitMQ数据

最近在做一个公司的日志组件时有一个问题难住了我。今天问题终于解决了。由于在解决问题中&#xff0c;在网上也查了很多资料都没有一个完整的实例可以参考。所以本着无私分享的目的记录一下完整的解决过程和实例。 需求&#xff1a;做一个统一日志系统可以查看日志列表和一个可…...

课题学习(二十一)----姿态更新的四元数算法推导

声明&#xff1a;本人水平有限&#xff0c;博客可能存在部分错误的地方&#xff0c;请广大读者谅解并向本人反馈错误。    最近需要使用AEKF对姿态进行结算&#xff0c;所以又对四元数进了深入的学习&#xff0c;本篇博客仅对四元数进行推导&#xff0c;后续会对基于四元数的…...

NL2SQL进阶系列(5):论文解读业界前沿方案(DIN-SQL、C3-SQL、DAIL-SQL、SQL-PaLM)、新一代数据集BIRD-SQL解读

NL2SQL进阶系列(5)&#xff1a;论文解读业界前沿方案&#xff08;DIN-SQL、C3-SQL、DAIL-SQL&#xff09;、新一代数据集BIRD-SQL解读 NL2SQL基础系列(1)&#xff1a;业界顶尖排行榜、权威测评数据集及LLM大模型&#xff08;Spider vs BIRD&#xff09;全面对比优劣分析[Text2…...

双指针运用:删除重复元素、移除元素

26.删除重复元素 题目描述 给你一个 非严格递增排列 的数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使每个元素 只出现一次 &#xff0c;返回删除后数组的新长度。元素的 相对顺序 应该保持 一致 。然后返回 nums 中唯一元素的个数。 考虑 nums 的唯一元…...

什么是三高架构

三高架构是指在软件系统设计与开发中&#xff0c;注重解决高并发性、高可用性和高性能的架构设计模式。 高并发性&#xff1a;指系统能够处理大量并发请求的能力。在高并发场景下&#xff0c;系统需要具备有效的并发处理机制&#xff0c;以保证系统能够快速、准确地响应大量并…...

Unity 对APK签名

关键代码 PS D:\UnityProject\YueJie> jarsigner -verbose -keystore D:\UnityProject\YueJie\user.keystore -signedjar D:\UnityProject\YueJie\meizuemptyapk-release-signed.apk D:\UnityProject\YueJie\MeizuEmpty-release-unsigned.apk 1 示例 # jarsigner的命令格…...

合成孔径雷达干涉测量InSAR数据处理、地形三维重建、形变信息提取、监测等应用

合成孔径雷达干涉测量&#xff08;Interferometric Synthetic Aperture Radar, InSAR&#xff09;技术作为一种新兴的主动式微波遥感技术&#xff0c;凭借其可以穿过大气层&#xff0c;全天时、全天候获取监测目标的形变信息等特性&#xff0c;已在地表形变监测、DEM生成、滑坡…...

QT进阶------------------QPushButton(快速添加按钮与使用)

1、解决如何快速的添加按钮 在qt中&#xff0c;通常我们喜欢一个按钮添加一个信号与槽&#xff0c;但是这样写太过浪费时间。要是多个按钮那不是要写30个信号与槽&#xff0c;说实话&#xff0c;我不太喜欢这样。 在ui中&#xff0c;只要拖动按钮&#xff0c;会自动生成按钮的名…...

Vue项目管理器创建项目

黑马程序员JavaWeb开发教程 文章目录 1、创建新项目2、详情3、预设4、功能5、配置6、是否保存为预设模板7、正在创建项目8、创建完成 1、创建新项目 2、详情 3、预设 选择手动&#xff0c;点击下一步 4、功能 只需要额外选择一项–Router 即可&#xff0c;其余的保持默认&a…...

PHP-extract变量覆盖

[题目信息]&#xff1a; 题目名称题目难度PHP-extract变量覆盖1 [题目考点]&#xff1a; 变量覆盖指的是用我们自定义的参数值替换程序原有的变量值&#xff0c;一般变量覆盖漏洞需要结合程序的其它功能来实现完整的攻击。 经常导致变量覆盖漏洞场景有&#xff1a;$$&#x…...

研究表明,全球互联网流量竟有一半来自机器人

据Cyber News消息&#xff0c;Thales Imperva Bad Bot近期做了一份报告&#xff0c;显示在2023年有49.6%的互联网流量竟来自机器人&#xff0c;比上一年增长 2%&#xff0c;达到自2013年以来观察到的最高水平。 报告称&#xff0c;这一趋势正对企业组织产生负面影响&#xff0c…...

橡胶衬板的更换与安装

橡胶衬板的更换与安装 橡胶衬板作为一种重要的工业材料&#xff0c;广泛应用于各种设备和机器中&#xff0c;以提供减震、防滑、耐磨等功能。然而&#xff0c;随着时间的推移和使用频率的增加&#xff0c;橡胶衬板可能会磨损或老化&#xff0c;需要及时更换和安装。本文将介绍…...

Compose 简单组件

文章目录 Compose 简单组件TextText属性使用AnnotatedStringSpanStyleParagraphStyle SelectionContainer 和 DisableSelectionClickableText TextFieldTextField属性使用OutlinedTextFieldBasicTextFieldKeyboardOptions 键盘属性KeyboardActions IME动作 ButtonButton属性使用…...

第十一届蓝桥杯省赛真题(C/C++大学B组)

目录 试题A &#xff1a;门牌制作 试题B &#xff1a;既约分数 试题C &#xff1a;蛇形填数 试题D &#xff1a;跑步训练 试题E &#xff1a;七段码 试题F &#xff1a;成绩统计 试题G &#xff1a;回文日期 试题H &#xff1a;字串分值 试题I &#xff1a;平面切分&a…...

Qt 实战(2)搭建开发环境 | 2.1、Windows下安装QT

一、Windows下安装QT 1、QT官网 QT官网&#xff1a;https://download.qt.io/&#xff0c;打开官网地址&#xff0c;如下&#xff1a; 目录结构介绍 目录说明snapshots预览版&#xff0c;最新的开发测试中的 Qt 库和开发工具onlineQt 在线安装源official_releases正式发布版&am…...

校园通用型发生网络安全事件解决方案

已知校园多教学楼、多教学机房、非标网络机房缺乏防护设备、检测设备、安全保护软件(杀软) 切断所有外网&#xff0c;断网理&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xf…...

数通HCIE考试分享:考前心态很重要,心情放松好过一次练习

誉天数通HCIE晚班火热预约中&#xff01;真机实验考前辅导备考资料&#xff0c;名师保驾护航&#xff0c;助你稳定通关&#xff01;识别二维码&#xff0c;即可获取免费试听名额&#xff01; 备考阶段 我是去年10月底完成了笔试考试&#xff0c;在笔试之前就将PY的课程过了一遍…...

GVRP协议与动态、静态vlan

一、GVRP协议使用场景 1、当实际组网复杂到网络管理员无法短时间内了解网络的拓扑结构&#xff0c;或者是整个网络的VLAN太多时&#xff0c;工作量会非常大&#xff0c;而且非常容易配置错误。在这种情况下&#xff0c;用户可以通过GVRP的VLAN自动注册功能完成VLAN的配置。 2、…...

shell脚本启动jar包

1、启动脚本的命令start.sh # 设置jar包名称 JAR_NAME"ruoyi-admin.jar" # 使用pgrep查找jar包名称的进程&#xff0c;如果存在&#xff0c;返回0&#xff08;表示找到了进程&#xff09; if pgrep -f "$JAR_NAME" >/dev/null thenecho "Jar进程已…...

qt 元对象系统及属性系统

Qt元对象系统(QMetaObject) Qt 的元对象系统叫 Meta-Object-System&#xff0c;提供了对象之间通信的信号与槽机制、运行时类型信息和动态属性系统。即使编译器不支持RTTI&#xff08;RTTI的实现耗费了很大的时间和存储空间&#xff0c;这就会降低程序的性能&#xff09;&…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...

LabVIEW双光子成像系统技术

双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制&#xff0c;展现出显著的技术优势&#xff1a; 深层组织穿透能力&#xff1a;适用于活体组织深度成像 高分辨率观测性能&#xff1a;满足微观结构的精细研究需求 低光毒性特点&#xff1a;减少对样本的损伤…...

Qt 事件处理中 return 的深入解析

Qt 事件处理中 return 的深入解析 在 Qt 事件处理中&#xff0c;return 语句的使用是另一个关键概念&#xff0c;它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别&#xff1a;不同层级的事件处理 方…...

SpringAI实战:ChatModel智能对话全解

一、引言&#xff1a;Spring AI 与 Chat Model 的核心价值 &#x1f680; 在 Java 生态中集成大模型能力&#xff0c;Spring AI 提供了高效的解决方案 &#x1f916;。其中 Chat Model 作为核心交互组件&#xff0c;通过标准化接口简化了与大语言模型&#xff08;LLM&#xff0…...

k8s从入门到放弃之HPA控制器

k8s从入门到放弃之HPA控制器 Kubernetes中的Horizontal Pod Autoscaler (HPA)控制器是一种用于自动扩展部署、副本集或复制控制器中Pod数量的机制。它可以根据观察到的CPU利用率&#xff08;或其他自定义指标&#xff09;来调整这些对象的规模&#xff0c;从而帮助应用程序在负…...

Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践

在 Kubernetes 集群中&#xff0c;如何在保障应用高可用的同时有效地管理资源&#xff0c;一直是运维人员和开发者关注的重点。随着微服务架构的普及&#xff0c;集群内各个服务的负载波动日趋明显&#xff0c;传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...

倒装芯片凸点成型工艺

UBM&#xff08;Under Bump Metallization&#xff09;与Bump&#xff08;焊球&#xff09;形成工艺流程。我们可以将整张流程图分为三大阶段来理解&#xff1a; &#x1f527; 一、UBM&#xff08;Under Bump Metallization&#xff09;工艺流程&#xff08;黄色区域&#xff…...

【java面试】微服务篇

【java面试】微服务篇 一、总体框架二、Springcloud&#xff08;一&#xff09;Springcloud五大组件&#xff08;二&#xff09;服务注册和发现1、Eureka2、Nacos &#xff08;三&#xff09;负载均衡1、Ribbon负载均衡流程2、Ribbon负载均衡策略3、自定义负载均衡策略4、总结 …...