基于MQTT协议实现微服务架构事件总线
一、场景描述
昨天在博客《客户端订阅服务端事件的实现方法》中提出了利用websocket、服务端EventEmitter和客户端mitt实现客户端订阅服务端事件,大大简化了客户端对服务端数据实时响应的逻辑。上述方案适用于单服务节点的情形。
对于由服务集群支撑的微服务架构,websocket提供的点对点通信已无法满足前端订阅后端集群事件的需求,升级方案是使用基于消息总线的通信方式。

二、几种消息总线适用性比较
常用的消息总线包括Kafka、Redis和基于MQTT协议实现的EMQX。
Kafka和MQTT都是从发布/订阅系统演化而来,但发展侧重点不同。Kafka通过分布式架构提供了海量数据流的存储,并保证数据流顺序,它的设计目标是支持数据发布、订阅和存储。而MQTT用于网络中传输小型数据包,其设计目的是实现简单、可靠的设备间通信。
而Redis是从内存数据库系统演化而来,发布/订阅功能是把消息保存在内存中。与Kafka相比,其只能提供半持久化;与MQTT相比,其通信效率较低。
由于应用场景没有对消息持久化的需求,且考虑到产业大脑平台未来会接入工业互联网,使用MQTT协议来搭建事件总线更利于平台在工业互联网环境下的扩展。
三、MQTT简介
几年前,本人曾写过MQTT简介。本文摘抄其中重要概念。
(一)MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是基于“订阅/发布”模式的轻量级通信协议,该协议基于TCP/IP,能以极低的带宽为海量(百万级)跨域设备提供可靠的消息服务,因此在物联网、小型移动终端、边缘计算方面有广泛应用。
所谓可靠的消息传输,体现为可配置消息的服务质量(QoS),有三种服务质量可选:
- 至多一次:
消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。应用场景如环境传感器的数据采集,丢失一次记录无所谓,因为不久后还会有第二次发送。 - 至少一次:
确保消息送达订阅者,但消息可能重复,适用于幂等性操作。 - 只有一次:
最严格的消息服务质量,确保消息到达且仅到达一次订阅者。应用场景如计费系统等。
MQTT协议中存在三种身份:消息总线(Broker)、发布者(Publish)和订阅者(Subscribe),其中消息总线属于服务器,后两者都属于客户端。发布者和订阅者可以是各种物联网设备和小型终端,消息发布者可以同时也是消息订阅者,如下图所示。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
- Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
- payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
订阅消息时,可以在订阅表达式中使用通配符筛选器对主题进行筛选,可同时订阅所匹配的多个主题。
MQTT协议中主要有以下5个方法:
- connect:客户端建立与服务器的连接
- disconnect:等待客户端完成工作后,端口与总线的会话
- subscribe:客户端向消息总线注册订阅主题
- unsubscribe:客户端等待消息总线取消所注册的订阅
- publish:客户端向消息总线发送某主题的消息
(二)开源消息总线EMQX
EMQX(Erlang/Enterprise/Elastic MQTT Broker),是基于Erlang语言开发的开源物联网MQTT消息总线。其是一款由前华为员工开发的开源软件,软件主页为https://www.emqx.io/。可根据操作系统类别选择不同版本下载安装,或通过docker部署。
软件安装后,通过 emqx start以后台方式启动。启动后将会开放两个端口:
- 18083端口为控制台端口,可通过浏览器访问该端口,首次登录的用户名和密码为admin和public。控制台提供了总线监控、用户权限管理、在线客户端订阅/发布等功能。
- 8083端口为通信端口,MQTT客户端可通过该端口与EMQX消息总线通信。
(三)MQTT.js客户端
MQTT.js是MQTT客户端Nodejs SDK,可在浏览器(ES模块)和Node.js环境(CommonJS模块)下使用,前者可通过MQTT over WebSocket使用,后者既可以通过MQTT over WebSocket使用,也可以直接使用MQTT。区别仅仅是连接参数的协议头不同。
1. 安装和帮助文件
$ pnpm i mqtt -S #安装
$ npx mqtt help #帮助
MQTT.js command line interface, available commands are:* publish publish a message to the broker* subscribe subscribe for updates from the broker* version the current MQTT.js version* help help about commandsLaunch 'mqtt help [command]' to know more about the commands.
2. 使用方法
// const mqtt = require('mqtt') //ES模块
import mqtt from 'mqtt' //CommonJS模块// 连接选项
const options = {clean: true, // true: 清除会话, false: 保留会话connectTimeout: 4000, // 超时时间// 认证信息clientId: 'user_id', // 要保证唯一性// 若在控制台配置了用户名和密码:// username: 'xxx',// password: 'xxx',
}// 连接字符串, 通过协议指定使用的连接方式
// ws 未加密 WebSocket 连接
// wss 加密 WebSocket 连接
// mqtt 未加密 TCP 连接
// mqtts 加密 TCP 连接
// wxs 微信小程序连接
// alis 支付宝小程序连接
const connectUrl = 'ws://localhost:8084/mqtt'
const client = mqtt.connect(connectUrl, options)client.on('reconnect', error => {console.error('正在重连:', error)
})client.on('error', error => {console.error('连接失败:', error)
})//收到消息
client.on('message', (topic, message) => {console.log('收到消息:', topic, message.toString()) //message是二进制流,需要转换成字符串
})//订阅主题
const topic='/user_id/#'
const qos=0 //0:最多交付1次;1:至少交付1次;2:只交付1次
client.subscribe(topic, qos, error=>{ //订阅user_id主题下所有消息if(error){console.error('订阅主题失败:', error)return}console.log('订阅成功')
})//发布消息
client.publish('user_id/a',JSON.stringify({a:123}),qos,error=>{if(error){console.error('发布消息失败:', error)}
})//取消订阅
client.unsubscrib(topic, qos, error=>{if(error){console.error('取消订阅失败', error)return}
})//断开连接
if(client.connected){try{client.end(false,()=>{console.log('成功断开连接')})catch(error){console.error('断开连接失败:', error)}}
}
(四)安全性
1. 排它订阅
排它订阅是 EMQX 支持的 MQTT 扩展功能。排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。
2. JWT认证
系统整体采用JWT认证方式,通过一台认证服务器颁发JWT Token。MQTT客户端访问EMQX总线时,携带由认证服务器颁发的JWT Token。
相关文章:
基于MQTT协议实现微服务架构事件总线
一、场景描述 昨天在博客《客户端订阅服务端事件的实现方法》中提出了利用websocket、服务端EventEmitter和客户端mitt实现客户端订阅服务端事件,大大简化了客户端对服务端数据实时响应的逻辑。上述方案适用于单服务节点的情形。 对于由服务集群支撑的微服务架构&…...
免费的Git图形界面工具sourceTree介绍
阅读本文同时请参阅-----代码库管理工具Git介绍 sourceTree是一款免费的Git图形界面工具,它简化了Git的使用过程,使得开发者可以更加方便地下载代码、更新代码、提交代码和处理冲突。下面我将详细介绍如何使用sourceTree进行这些操作。 1.下载和…...
【Appium UI自动化】pytest运行常见错误解决办法
通过Appium工具录制代码在pycharm上运行报错: 错误一: 1.提示 setup() 方法运行 error failed 解决办法:未创建 init __ 方法,创建一个空的__init.py文件就解决了。 原因: 错误二: 2.运行代码ÿ…...
IDEA如何开启Dashboard
普通的面板 Run Dashboard面板 修改配置文件 找到项目的.idea文件夹 点击编辑workspace.xml文件 添加下方代码 <component name"RunDashboard"><option name"ruleStates"><list><RuleState><option name"name" valu…...
【论文复现】——一种新的鲁棒三维点云平面拟合方法
目录 一、算法原理1、论文概述2、参考文献二、代码实现三、结果展示本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的GPT爬虫。 一、算法原理 1、论文概述 针对三维点云中的异常值和粗差点对平面拟合精度产生的影响,文章提出一…...
【C语言】学生宿舍信息管理系统
目录 项目说明 1. 数据结构设计 2. 功能实现 3. 主菜单设计 4. 文件操作 5. 系统使用 项目展示 1.主菜单功能界面 编辑 2.添加信息 3.查询信息 4.修改信息 5.删除信息 6.退出程序 项目完整代码 结语 在这篇博客中,我们将探讨如何使用C语言来开发…...
用Python插入页码到PDF文档
页码是许多类型文件中的重要内容,它能方便读者在文档中的导航。在创建PDF文档时,添加页码对于组织和引用内容特别有用。在本文中,我们将探讨如何利用Python程序高效地插入页码到PDF文档中,简化工作流程并创建出精美、结构合理的PD…...
LabVIEW光偏振态转换及检测仿真系统
LabVIEW光偏振态转换及检测仿真系统 随着光学技术的发展,光偏振态的研究与应用越来越广泛。为了深入理解光的偏振现象,开发了一套基于LabVIEW的光偏振态转换及检测仿真系统。该系统不仅能够模拟线偏振光、圆偏振光、椭圆偏振光等不同偏振态的产生与转换…...
scp 本地机和远程机传输文件的方法
在本地机器上,通过ssh连接到远程机器,如果想要在两个机器之间互相传输文件,那么可以使用scp。 scp运行的地方:本地机的终端 样例: 1 将本地文件filename1传输到远程主机的/home/username/filename2目录下 scp -r D:\…...
自定义神经网络二之模型训练推理
文章目录 前言模型概念模型是什么?模型参数有哪些神经网络参数案例 为什么要生成模型模型的大小什么是大模型 模型的训练和推理模型训练训练概念训练过程训练过程中的一些概念 模型推理推理概念推理过程 总结 前言 自定义神经网络一之Tensor和神经网络 通过上一篇…...
Java设计模式:单例模式之六种实现方式详解(二)
在Java中,单例模式是一种常见的设计模式,用于确保一个类只有一个实例,并提供一个全局访问点来获取该实例。单例模式在多种场景下都很有用,比如配置文件的读取、数据库连接池、线程池等。本文将详细介绍Java中实现单例模式的六种方…...
开创5G无线新应用:笙科电子5.8GHz 射频芯片
笙科电子(AMICCOM) 5.8GHz A5133射频芯片是一款专门设计用于在5.8GHz频率范围内(5725MHz - 5850MHz)进行射频信号处理的集成电路。这些集成电路通常包括各种功能模块,如射频前端、混合器、功率放大器、局部振荡器等,以支持无线通信系统的各种…...
使用 JMeter 生成测试数据对 MySQL 进行压力测试
博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,…...
C# cass10 面积计算
运行环境Visual Studio 2022 c# cad2016 cass10 通过面积计算得到扩展数据,宗地面积 ,房屋占地面积,房屋使用面积 一、主要步骤 获取当前AutoCAD应用中的活动文档、数据库和编辑器对象。创建一个选择过滤器,限制用户只能选择&q…...
中间件-Nginx漏洞整改(限制IP访问隐藏nginx版本信息)
中间件-Nginx漏洞整改(限制IP访问&隐藏nginx版本信息) 一、限制IP访问1.1 配置Nginx的ACL1.2 重载Nginx配置1.3 验证结果 二、隐藏nginx版本信息2.1 打开Nginx配置文件2.2 隐藏Nginx版本信息2.3 保存并重新加载Nginx配置2.4 验证结果2.5 验证隐藏版本…...
Xcode与Swift开发小记
文章目录 引子Xcode工程结构核心概念Swift语法速记(TODO)小技巧单元测试中使用awaitSwiftUI中使用ListView中取数据 常见问题Xcode添加package时连接github超时Xcode无法修改快捷键,一闪而过 引子 鉴于React Native目前版本在iOS上开发遇到诸多问题,本以…...
日更【系统架构设计师知识总结3】存储系统
【原创精华总结】自己一点点手打、总结的脑图,把散落在课本以及老师讲授的知识点合并汇总,反复提炼语言,形成知识框架。希望能给同样在学习的伙伴一点帮助!...
《TCP/IP详解 卷一》第7章 防火墙和NAT
7.1 引言 NAT通常改变源IP和源端口,不改变目的IP和目的端口。 7.2 防火墙 常用防火墙: 包过滤防火墙(packet-filter firewall) 代理防火墙(proxy firewall) 代理防火墙作用: 1. 通过代理服务…...
访问raw.githubusercontent.com失败问题的处理
1 问题 GitHub上的项目的有些资源是放在raw.githubusercontent.com上的,通常我们在安装某些软件的时候会从该地址下载资源,直接访问的话经常容易失败。 # 安装operator kubectl apply -f https://raw.githubusercontent.com/oceanbase/ob-operator/2.1…...
Elasticsearch的基本安装教程,Elasticsearch+SpringBoot实现简单的增删改查功能
Elasticsearch 是一个开源的分布式搜索和分析引擎,最初由 Elastic 公司开发。它是基于 Apache Lucene 的搜索引擎构建的,提供了强大的搜索和分析功能,并支持实时数据检索和分析。 Elasticsearch 被设计用来处理大规模的数据集,它具有以下几个主要特点: 分布式架构: Elast…...
从WWDC看苹果产品发展的规律
WWDC 是苹果公司一年一度面向全球开发者的盛会,其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具,对过去十年 WWDC 主题演讲内容进行了系统化分析,形成了这份…...
深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...
MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...
C#中的CLR属性、依赖属性与附加属性
CLR属性的主要特征 封装性: 隐藏字段的实现细节 提供对字段的受控访问 访问控制: 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性: 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑: 可以…...
Caliper 负载(Workload)详细解析
Caliper 负载(Workload)详细解析 负载(Workload)是 Caliper 性能测试的核心部分,它定义了测试期间要执行的具体合约调用行为和交易模式。下面我将全面深入地讲解负载的各个方面。 一、负载模块基本结构 一个典型的负载模块(如 workload.js)包含以下基本结构: use strict;/…...
上位机开发过程中的设计模式体会(1):工厂方法模式、单例模式和生成器模式
简介 在我的 QT/C 开发工作中,合理运用设计模式极大地提高了代码的可维护性和可扩展性。本文将分享我在实际项目中应用的三种创造型模式:工厂方法模式、单例模式和生成器模式。 1. 工厂模式 (Factory Pattern) 应用场景 在我的 QT 项目中曾经有一个需…...
React从基础入门到高级实战:React 实战项目 - 项目五:微前端与模块化架构
React 实战项目:微前端与模块化架构 欢迎来到 React 开发教程专栏 的第 30 篇!在前 29 篇文章中,我们从 React 的基础概念逐步深入到高级技巧,涵盖了组件设计、状态管理、路由配置、性能优化和企业级应用等核心内容。这一次&…...
JDK 17 序列化是怎么回事
如何序列化?其实很简单,就是根据每个类型,用工厂类调用。逐个完成。 没什么漂亮的代码,只有有效、稳定的代码。 代码中调用toJson toJson 代码 mapper.writeValueAsString ObjectMapper DefaultSerializerProvider 一堆实…...
深入解析光敏传感技术:嵌入式仿真平台如何重塑电子工程教学
一、光敏传感技术的物理本质与系统级实现挑战 光敏电阻作为经典的光电传感器件,其工作原理根植于半导体材料的光电导效应。当入射光子能量超过材料带隙宽度时,价带电子受激发跃迁至导带,形成电子-空穴对,导致材料电导率显著提升。…...
Linux入门(十五)安装java安装tomcat安装dotnet安装mysql
安装java yum install java-17-openjdk-devel查找安装地址 update-alternatives --config java设置环境变量 vi /etc/profile #在文档后面追加 JAVA_HOME"通过查找安装地址命令显示的路径" #注意一定要加$PATH不然路径就只剩下新加的路径了,系统很多命…...
