emqx桥接配置+常见问题解决+jmeter压测emqx
一,桥接资源配置及规则配置
Emqx桥接配置流程
1,配置资源并测试连接通过
规则引擎——>资源——>新建——>选择MQTT Bridge——>填写参数测试连接
参数描述详见3.1资源配置
2,配置规则
2.1根据实际业务选择合适sql
规则引擎——>规则——>新建——>规则sql
(sql见06手册/实施部署手册/empx 桥接规则配置模版.xlsx)
2.2填写规则id
“rule:当前节点_upload_目标节点”;例如“rule:yantai_upload_shandong”
2.3添加响应动作
动作:选择“桥接数据到 MQTT Broker”
关联资源:选择配置好的目标资源节点(没有目标资源点击新建资源去新建)
转发消息主题:空着即可
(转发消息时使用的主题。如果未提供,则默认为桥接消息的主题)
消息内容模板: 填写”${payload}”
(支持变量。若使用空模板(默认),消息内容为 JSON 格式的所有字段)
3,参数配置
3.1资源配置
1,资源类型:下拉选择MQTT Bridge
2,资源ID:
一对一:“resource:”+当前节点_to_目标节点;例如“resource:yantai_to_shandong”
一对多:“resource:”+当前节点_to_目标节点;例如“resource:guojia_to_provinces”
3,连接池大小:设为默认值8
4,客户端id:当前节点+client;例如“yantai_client”
5,附加GUID:设为默认值true(附加 GUID 选项,设置为 true 时,MQTT 连接使用的 clientid 增加随机后缀以保证全局唯一性。 设置为 false 时,会导致 clientid 使用同一个,连接池中线程互踢,EMQX 多个节点之间的桥接也会互踢,推荐仅在单节点 EMQX 且连接池大小为 1 时开启此选项。)
6,用户名:连接远程Broker的用户名
7,密码: 连接远程Broker的密码
8,桥接主题的挂载点:示例: 本地节点向 topic1
发消息,远程桥接节点的主题会变换为 bridge/aws/${node}/topic1
,程序中应设置为空
9,磁盘缓存:设为默认值off
10,协议版本:设为默认值mqttV4
11,心跳间隔:设为默认值60s
12,重连间隔:设为默认值30s
13,重传间隔:默认值20s
14,桥接模式:false
15,开启SSL连接:false
16,服务器名称知识:指定用于对端证书验证时使用的主机名,
或者设置为 disable 以关闭此项验证。(默认不填即可)
注意:配置完毕点击测试连接,显示连接成功即可应用
二,过多的消息发布
ERROR,MQTT(32202): 正在发布过多的消息
解决方案
1,增大maxInflight(最低需要paho1.2.0版本)
2,配置多个mqtt client
由于mqttMessageHandler只会引用一个paho客户端,并且在内部对paho客户端做了封装,所以直接修改MqttPahoMessageHandler复杂度较高,我们可以重新写一个MultiMqttMessageHandler,内部初始化多个MqttPahoMessageHandler,这样通过MessageingGateway发送消息时,直接通过MultiMqttMessageHandler来处理mqtt消息,MultiMqttMessageHandler可以通过负载均衡的方式来把消息分派给各个MqttPahoMessageHandler
1,自定义MyMqttPahoMessageHandler类,继承MqttPahoMessageHandler,注意权限由protected改成public。handleMessageInternal()会由channel通过dispatcher间接调用;重写onInit()用来手动初始化MqttPahoMessageHandler。
@Overridepublic void doStop() {super.doStop();}@Overridepublic void handleMessageInternal(Message<?> message) throws Exception {super.handleMessageInternal(message);}@Overridepublic void onInit() {try {super.onInit();} catch (Exception e) {e.printStackTrace();}}
2,自定义MultiMqttMessageHandler类,继承AbstractMessageHandler,并implements Lifecycle,自定义一个MessageHandler,添加一个Map成员属性,用来维系多个MyMqttPahoMessageHandler;handlerCount变量可配置多个mqtt client。这里只用了radom随机数来做负载均衡
private final AtomicBoolean running = new AtomicBoolean();
private volatile Map<Integer, MessageHandler> mqttHandlerMap;@Value("${spring.mqtt.sender.count}")
private Integer handlerCount;@Autowired
private MqttSenderConfig senderConfig;@Override
public void start() {if (!this.running.getAndSet(true)) {doStart();}
}private void doStart(){mqttHandlerMap = new ConcurrentHashMap<>();for(int i=0;i<handlerCount;i++){mqttHandlerMap.put(i, senderConfig.createMqttOutbound());}
}@Override
public void stop() {if (this.running.getAndSet(false)) {doStop();}
}private void doStop(){for(Map.Entry<Integer, MessageHandler> e : mqttHandlerMap.entrySet()){MessageHandler handler = e.getValue();((MyMqttPahoMessageHandler)handler).doStop();}
}@Override
public boolean isRunning() {return this.running.get();
}@Override
protected void handleMessageInternal(Message<?> message) throws Exception {Random random = new Random();MyMqttPahoMessageHandler messageHandler = (MyMqttPahoMessageHandler)mqttHandlerMap.get(random.nextInt(handlerCount));messageHandler.handleMessageInternal(message);
}
3,消息发布配置类
@Bean
public MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setServerURIs(hostUrl);factory.setUserName(username);factory.setPassword(password);return factory;
}
public MessageHandler createMqttOutbound(){String tempId = MqttAsyncClient.generateClientId();MyMqttPahoMessageHandler messageHandler = new MyMqttPahoMessageHandler(clientId + "sender" + tempId, mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);messageHandler.setDefaultQos(1);messageHandler.onInit();return messageHandler;
}@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {return new MultiMqttMessageHandler();
}@Bean
public MessageChannel mqttOutboundChannel() {return new DirectChannel();
}@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
三,报文内容过大
调整emqx参数值
zone.external.max_packet_size
mqtt.max_packet_size
四,队列已满
调整emqx参数值
zone.external.max_mqueue_len
消息队列最大长度。当飞行窗口满,或客户端离线后,消息会被存储至该队列中。0 表示不限制。
五,发送大消息的时候,客户端会被强制kill掉
emqx升级到4.4.10版本之后
六,其他重点相关优化参数
参数介绍api链接
https://www.emqx.io/docs/zh/v4.3/configuration/configuration.html#cluster
//集群节点发现方式。可选值为:manual: 手动加入集群static: 配置静态节点。配置几个固定的节点,新节点通过连接固定节点中的某一个来加入集群。mcast: 使用 UDP 多播的方式发现节点。dns: 使用 DNS A 记录的方式发现节点。etcd: 使用 etcd 发现节点。k8s: 使用 Kubernetes 发现节点。
cluster.discovery//指定多久之后从集群中删除离线节点。
cluster.autoclean//当使用 static 方式集群时,指定固定的节点列表,多个节点间使用逗号分隔
cluster.static.seeds//节点名。格式为 <name>@<host>。其中 <host> 可以是 IP 地址,也可以是 FQDN:注意格式限制
node.name//系统调优参数,设置 Erlang 允许的最大进程数,这将影响 emqx 节点能处理的连接数
//integer 1024 - 134217727 默认:2097152
node.process_limit//系统调优参数,设置 Erlang 允许的最大 Ports 数量
//integer 1024 - 134217727 1048576
node.max_ports//系统调优参数,设置 Erlang 分布式通信使用的最大缓存大小
//bytesize 1KB - 2GB 8MB
node.dist_buffer_size//系统调优参数,设置 Erlang 运行时允许的最大 ETS 表数量 integer 默认262144
node.max_ets_tables//系统调优参数,设置 Erlang 运行多久强制进行一次全局垃圾回收。默认15m
node.global_gc_interval//系统调优参数,设置 Erlang 运行时多少次 generational GC 之后才进行一次 fullsweep GC。
//integer 0 - 65535 默认:1000
node.fullsweep_after//系统调优参数,当一个节点持续无响应多久之后,认为其已经宕机并断开连接 默认120node.dist_net_ticktime//MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。
mqtt.retain_available//是否忽略自己发送的消息:默认false
mqtt.ignore_loop_deliver//当收到一定数量的消息,或字节,就强制执行一次垃圾回收。
//16000|16MB 表示当收到 16000 条消息,或 16MB 的字节流入就强制执行一次垃圾回收
zone.external.force_gc_policy//允许客户端订阅主题的最大层级。0 表示不限制,层级多会有性能问题zone.external.max_topic_levels//飞行窗口大小。飞行窗口用于存储未被应答的 QoS 1 和 QoS 2 消息
zone.external.max_inflight//消息重发间隔。EMQX 在每个间隔检查是否需要进行消息重发
zone.external.retry_interval//消息队列是否存储 QoS 0 消息。
zone.external.mqueue_store_qos0//ACL机制
MQTT 授权(authorization)是指对 MQTT 客户端的发布和订阅操作进行 权限控制。 控制的内容主要是哪些客户端可以发布或者订阅哪些 MQTT 主题。
EMQX 支持集中类型的授权。权限列表(亦即 ACL)。可以从例如 MongoDB, MySQL,PostgreSQL,Redis,或者 EMQX 的内置数据库中读取这个列表。加载一个包含全局的 ACL 的文件。动态访问一个 HTTP 后端服务,并通过该 HTTP 调用的返回值来客户端是否有访问的权限。通过提取认证过程中携带的授权数据,例如 JWT 的某个字段。
EMQX 最大文件句柄数done:ulimit -n 1048576done: /etc/security/limits.confdone: /etc/sysctl.confdone: /etc/systemd/system.confdone: 重启 emqx 服务:ulimit -n 1048576; ./emqx stop; ./emqx startdone: 确认 EMQX Web 后台显示
tcp并发数
/etc/systemd/system.conf查看默认值$ systemctl --user show syncthing | grep LimitNOFILELimitNOFILE=4096LimitNOFILESoft=1024设置DefaultLimitNOFILE=1048576
7,jmeter压测emqx
1. 下载jmeter,解压
https://jmeter.apache.org/download_jmeter.cgi
以 5.4.3 为例,下载地址: https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.4.3.zip
linux下解压: unzip apache-jmeter-5.4.3.zip
2. 下载mqtt-jmeter插件
下载地址:
https://github.com/emqx/mqtt-jmeter/releases
https://github.com/emqx/mqtt-jmeter/releases/download/v2.0.2/mqtt-xmeter-2.0.2-jar-with-dependencies.jar
3. 将插件放置于jmeter的lib/ext目录下,windows/linux同样操作
4. 本文先在windows下生成的jmx脚本,然后传至linux下使用
4.1 新建两个线程组
第一个仅包含一个 MQTT DisConnect,执行一次
第二个里面包含具体的压测,开启1000个线程,1s内将线程创建完毕,无限循环。创建两个计数器,pub_counter用来技术发布消息数,thread_counter用来线程计数
4.2 事先创建1000个设备,名称为cosmoiottest000001 - cosmoiottest000001000(可自己定义)。添加一次性控制器(mqtt连接一次,后续pub消息),写上配置信息。
4.3 添加循环控制器,循环一次。包含固定定时器,休眠1000ms,一个发布MQTT Pub Sampler,即每个线程进来执行一次发布消息然后休眠1000ms进入下一次循环。每个消息包含100个点位(根据自己需要设置),每个点位随机生成一个整数。配置详见截图
4.4 添加观察结果树、汇总报告、聚合报告等,可在windows下面查看结果
4.5 配置截图如下:
循环执行线程

chmod +x bin/jmeter
./bin/jmeter -n -t mqtt_test.jmx -l result.jtl
6. 将结果jtl生成可视化报告,放置于result目录
mkdir result
./bin/jmeter -g result.jtl -o result
将结果目录拉下来,点开即可查看图形化结果
注,可能遇到问题:
1. 执行jmeter压测后,进程不退出,编辑 jmeter.properties,打开配置
jmeterengine.force.system.exit=true
2. jmx文件传到linux后可能出错,建议英文环境下生成jmx文件,语言控制jmeter.properties
#language=en (默认英文,切换为中文为:zh_CN)
3. mqtt-jmeter 的jar包需要传至lib/ext目录,否则不可用
4. 生成报告时报错:Consumer failed with message :Begin size 0 is not equal to fixed size 5
将jdk换成8版本
5. jtl结果文件,也可拉到windows,使用jmeter直接查看,新建线程组->聚合报告,选择jtl文件
相关文章:

emqx桥接配置+常见问题解决+jmeter压测emqx
一,桥接资源配置及规则配置 Emqx桥接配置流程 1,配置资源并测试连接通过 规则引擎——>资源——>新建——>选择MQTT Bridge——>填写参数测试连接 参数描述详见3.1资源配置 2,配置规则 2.1根据实际业务选择合适sql 规则引擎…...

improve-1
类型及检测方式 1. JS内置类型 JavaScript 的数据类型有下图所示 其中,前 7 种类型为基础类型,最后 1 种(Object)为引用类型,也是你需要重点关注的,因为它在日常工作中是使用得最频繁,也是需要…...

华为OD机试用Python实现 -【云短信平台优惠活动】(2023-Q1 新题)
华为OD机试题 华为OD机试300题大纲云短信平台优惠活动题目描述输入描述输出描述示例一输入输出说明示例二输入输出说明Python 代码实现代码编写思路华为OD机试300题大纲 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高。 华为 OD 清单查看…...

Facebook广告投放运营中的关键成功因素是什么?
在当今数字化的时代,广告投放已经成为了各种企业获取市场份额和增加品牌曝光的重要手段之一。Facebook作为全球最大的社交媒体平台之一,其广告投放运营的成功,将直接影响企业的品牌推广和市场营销效果。本文将探讨Facebook广告投放运营中的关…...

2023年1月综合预订类APP用户洞察——旅游市场复苏明显,三年需求春节集中释放
2023年1月,随着国家对新型冠状病毒感染实施“乙类乙管”,不再对入境人员和货物等采取检疫传染病管理措施,并且取消入境后全员核酸检测和集中隔离,横亘在旅游者与旅游目的地之间的隔阂从此彻底消失。2023年1月恰逢春节假期…...

基于stm32计算器设计
这里写目录标题 完整de代码可q我获取1 系统功能设计2 系统硬件系统分析设计2.1 STM32单片机核心电路设计2.2 LCD1602液晶显示模块电路设计2.3 4X4矩阵键盘模块设计3 STM32单片机系统软件设计3.1 编程语言选择3.2 Keil程序开发环境3.3 FlyMcu程序烧录软件介绍3.4 CH340串口程序烧…...
基于SpringCloud的可靠消息最终一致性02:项目骨架代码(上)
在上一节中咱们已经把分布式事务问题交代了一遍,包括两大定理、五大解决方案和一个成熟的开源框架,而咱们最终的目标是用Spring Cloud实现一个实际创业项目的可靠消息最终一致性的分布式事务方案。 先交代一下项目背景。 前几年,社会上慢慢兴起一种称为C2C同城快递的业务,也…...

RockerMQ集群部署
目录一、Broker集群模式1、单Master:2、多Master多Slave模式异步复制3、多Master多Slave模式同步双写二、集群搭建实践1、集群架构2、克隆生成rocketmqos13、修改rocketmqos1配置文件4、克隆生成rocketmqOS25、修改rocketmqOS2配置文件6、启动服务器7、测试一、Brok…...

unicloud的aggregate聚合查询时间戳转日期
我特么不知道看了这个帖子几百遍才看明白到-----》unicloud数据库中,聚合操作如何操作时间戳? - DCloud问答 自己淋过雨老想着为别人撑伞,可怜我这35岁的老人家,给我去点关注!!!!&a…...

Vue2.0开发之——使用ref引用组件实例(41)
一 概述 在本组件内部修改count的值在父组件内修改子组件的count值 二 在本组件内部修改count的值 2.1 Left.vue 布局代码 <template><div class"left-container"><h3 >Left 组件---{{count}}</h3><button click"count 1"&…...
极狐GitLab仓库瘦身
参考文章: [分享] 极狐GitLab仓库瘦身 - 官方技术分享 - 极狐GitLab 论坛 一、瘦身概述 Git仓库随着时间推移会变得越来越大,比如很多比较大的文件加入Git仓库时,可能引起以下问题: 下载仓库越来越慢,因为每个人都…...

2288hv5超融合服务器 数码管报888
【问题现象】 2288hv5超融合服务器,前面板数码管报888,电源灯黄灯闪烁,开不了机,ibmc网络是通的,但是web网页打不开 【问题原因】 iBMC的版本过低,iBMC在智能诊断数据库保护机制存在异常,导…...
【Zabbix实战之部署篇】Zabbix监控windows系统配置方法
【Zabbix实战之部署篇】Zabbix监控windows系统配置方法 一、检查Zabbix监控平台状态1.检查Zabbix各组件状态2.检查Zabbix的首页二、下载windows代理1.访问Zabbix官网下载界面2.查看下载安装包三、安装windows agent2代理1.安装windows agent2代理2.代理基本配置信息3.开始进行安…...

在Windows上编译Nginx
《在Windows上编译Nginx》视频教程官方编译说明 Building nginx on the Win32 platform with Visual C 环境准备 1. Microsoft Visual Studio(Microsoft Visual C 编译器),下载地址:https://visualstudio.microsoft.com/zh-hans/。 2. Git(备用)&…...

语音识别与Python编程实践
博主简介 博主是一名大二学生,主攻人工智能研究。感谢让我们在CSDN相遇,博主致力于在这里分享关于人工智能,c,Python,爬虫等方面知识的分享。 如果有需要的小伙伴可以关注博主,博主会继续更新的,…...

MATLAB绘制泰勒图(Taylor diagram)
泰勒图(Taylor diagram) 泰勒图是Karl E. Taylor于2001年首先提出,主要用来比较几个气象模式模拟的能力,因此该表示方法在气象领域使用最多,但是在其他自然科学领域也有一定的应用。 泰勒图常用于评价模型的精度&…...

ClickHouse高可用集群分片-副本实操(四)
目录 一、ClickHouse高可用之ReplicatedMergeTree引擎 二、 ClickHouse高可用架构准备-环境说明和ZK搭建 三、高可用集群架构-ClickHouse副本配置实操 四、ClickHouse高可用集群架构分片 4.1 ClickHouse高可用架构之两分片实操 4.2 ClickHouse高可用架构之两分片建表实操 一…...

2022年中国工业机器人行业市场回顾及2023年发展前景预测分析
工业机器人是一种能自动定位控制、可重复编程的、多功能的、多自由度的操作机,广泛应用于码垛、冲压、分拣、焊接、切割、喷涂、上下料等工业场景中,极大提高了生产效率、安全性以及智能化水平。工业机器人作为我国高端制造业的典型代表,近年…...

Gehpi的网络布局
Gehpi的网络布局1. 力引导布局2. 辅助布局布局是网络可视化中的重要概念,指将点和边通过某种策略进行排布,应尽可能满足以下4个原则: 节点均匀分布在有限的区域内避免边的交叉和弯曲保持边的长度一致整体布局能反映图内在的特性 Gephi的布局…...

华为OD机试用Python实现 -【天然蓄水库 or 天然蓄水池】(2023-Q1 新题)
华为OD机试题 华为OD机试300题大纲天然蓄水库 or 天然蓄水池题目描述输入描述输出描述说明示例一输入输出说明示例二输入输出说明示例三输入输出说明Python 代码实现算法思路华为OD机试300题大纲 参加华为...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...
【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验
系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...

在WSL2的Ubuntu镜像中安装Docker
Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...

均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...
LeetCode - 199. 二叉树的右视图
题目 199. 二叉树的右视图 - 力扣(LeetCode) 思路 右视图是指从树的右侧看,对于每一层,只能看到该层最右边的节点。实现思路是: 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲
文章目录 前言第一部分:体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分:体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...

STM32---外部32.768K晶振(LSE)无法起振问题
晶振是否起振主要就检查两个1、晶振与MCU是否兼容;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容(CL)与匹配电容(CL1、CL2)的关系 2. 如何选择 CL1 和 CL…...