当前位置: 首页 > news >正文

基于Locust实现MQTT协议服务的压测脚本

一、背景简介

业务背景大概介绍一下,就是按照国标规定,车辆需要上传一些指定的数据到ZF的指定平台,同时车辆也会把数据传到企业云端服务上,于是乎就产生了一些性能需求。

目前我们只是先简单的进行了一个性能场景的测试,就是评估目前服务是否能够支持,预期的最大同时在线车辆上传数据。经过评估,在线车辆数据按照预期的10倍来进行的,并且后面增加持续运行12h查看服务链路的稳定性。

本篇并不是一个严谨的性能测试过程结果分享,主要是分享下关于mqtt协议服务的压测脚本的编写。因为之前我也没接触过MQTT协议的压测,网上关于相关的压测脚本的内容也比较杂乱,所以记录一下,仅供参考。

捋一下链路就知道需要生成哪些数据(因为服务还未上线使用,所以产生的压测数据后面可以直接清理掉即可。):

  1. 一些前置数据:比如数据库、缓存里涉及到的车辆数据,通信秘钥数据等等,这些可以之前写脚本一次性生成即可。
  2. 车辆上报的数据:车辆上报到云端的数据,是经过一系列加密转码,期间还要设计到解密等,这个经过评估,可以简化其中的某些环境,所以所有的车可以直接发送相同的数据即可。
  3. 车辆数据:最后就是生成对应的车辆数据,同时在线,按照评估的频率发送数据。

其中第1、2的数据在之前针对性的分别生成即可,第3步的车辆发送数据就是压测脚本要干的事情了。

二、技术选型

这个倒是很快,搜索引擎大概搜了一下,内容很少,或者说对我有用的内容很少。有看到jmeter有相关插件的,但是这个方案基本上我都是否决的,一来我不擅长用,而来我觉得用起来肯定会比自己编码要麻烦的多。

所以就继续编码好了,仍然首选python,想到了locust库,后来看官方文档的时候,看到locust也针对mqtt协议拓展了一些内容。但是我尝试下来不太符合我这的需求,也可能当时我用的不对吧,所以就只能自己来从零开始编写了。

搜索中又发现Python中用于mqtt协议的库叫paho.mqtt,支持连接代理,消息的订阅、收发等等,于是最后确定使用:locust+paho.mqtt的组合来实现本次的负载脚本。

三、代码编写

1. 脚本代码

暂时没做代码分层,目前场景简单,就直接都放一个模块里了,有点长,先贴上来,后面部分会对脚本的重点内容进行拆解。

脚本目前做了这些事情:

  • 从db中查询有效可用的所有测试车辆信息数据
  • 根据命令行的输入参数,指定启动的车辆数,以及与broker代理建立连接的频率
  • 建立连接成功的车辆,就可以根据脚本里指定的频次,来像broker发送数据
  • 脚本统计连接数、请求数、响应时间等信息写到报表中
  • 调试遇到车辆会批量断开连接的情况,增加了当车辆断开连接时,把断开时间、车辆信息写到本地csv中,方便第二天来查看分析。
import csv
import datetime
import queue
import os
import sys
import time
import sslfrom paho.mqtt import client as mqtt_client# 根据不同系统进行路径适配
if os.name == "nt":path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))sys.path.insert(0, path)from GB_test.utils.mysql_operating import DB
elif os.name == "posix":sys.path.append("/app/qa_test_app/")from GB_test.utils.mysql_operating import DBfrom locust import User, TaskSet, events, task, between, run_single_userBROKER_ADDRESS = "broker服务地址"
PORT = 1111
PASSWORD = "111111"
PUBLISH_TIMEOUT = 10000  # 超时时间
TEST_TOPIC = "test_topic"TEST_VALUE = [16, 3, -26, 4, 0, 36,.......]  # 用来publish的测试数据,仅示意BYTES_DATA = bytes(i % 256 for i in TEST_VALUE)  # 业务需要转换成 byte 类型后再发送# 创建队列
client_queue = queue.Queue()# 连接DB,读取车辆数据
db = DB("db_vmd")
select_sql = "select xxxx"  
client_list = db.fetch_all(select_sql)
print("车辆数据查询完毕,数据量:{}".format(len(client_list)))
for t in client_list:# 把可用的车辆信息存到队列中去client_queue.put(t)def fire_success(**kwargs):"""请求成功时调用"""events.request.fire(**kwargs)def calculate_resp_time(t1, t2):"""计算响应时间"""return int((t2 - t1) * 1000)class MQTTMessage:"""已发送的消息实体类"""def __init__(self, _type, qos, topic, payload, start_time, timeout):self.type = _type,self.qos = qos,self.topic = topicself.payload = payloadself.start_time = start_timeself.timeout = timeout# 统计总共发送成功的消息数量
total_published = 0
disconnect_record_list = []  # 定义存放连接断开的记录的列表容器class PublishTask(TaskSet):@taskdef task_publish(self):self.client.loop_start()topic = TEST_TOPICpayload = BYTES_DATA# 记录发送的开始时间start_time = time.time()mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)published_mid = mqtt_msg_info.mid# 将发送成功的消息内容,放入client实例的 published_message 字段self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,0,topic,payload,start_time,PUBLISH_TIMEOUT)# 发送成功回调self.client.on_publish = self.on_publish# 断开连接回调self.client.on_disconnect = self.on_disconnect@staticmethoddef on_disconnect(client, userdata, rc):""" broker连接断开,放入列表容器"""disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]disconnect_record_list.append(disconnected_info)print("rc状态:{} - -".format(rc), "{}-broker连接已断开".format(str(client._client_id)))@staticmethoddef on_publish(client, userdata, mid):if mid:# 记录消息发送成功的时间end_time = time.time()# 从已发送的消息容器中,取出消息message = client.published_message.pop(mid, None)# 计算开始发送到发送成功的耗时publish_resp_time = calculate_resp_time(message.start_time, end_time)fire_success(request_type="p_success",name="client_id: " + str(client._client_id),response_time=publish_resp_time,response_length=len(message.payload),exception=None,context=None)global total_published# 成功发送累加1total_published += 1class MQTTLocustUser(User):tasks = [PublishTask]wait_time = between(2, 2)def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)# 从队列中获取客户端 username 和 client_idcurrent_client = client_queue.get()self.client = mqtt_client.Client(current_client[1])self.client.username_pw_set(current_client[0], PASSWORD)# self.client.username_pw_set(current_client[0] + "1", PASSWORD)  # 模拟client连接报错# 定义一个容器,存放已发送的消息self.client.published_message = {}def on_start(self):# 设置tlscontext = ssl.SSLContext(ssl.PROTOCOL_TLS)self.client.tls_set_context(context)self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)self.client.on_connect = self.on_connectdef on_stop(self):print("publish 成功, 当前已成功发送数量:{}".format(total_published))if len(disconnect_record_list) == 0:print("无断开连接的client")else:# 把断开记录里的信息写入csvwith open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:writer = csv.writer(csvfile)writer.writerow(['client_id', 'rc_status', 'disconnected_time'])for i in disconnect_record_list:writer.writerow(i)print("断开连接的client信息已写入csv文件")@staticmethoddef on_connect(client, userdata, flags, rc, props=None):if rc == 0:print("rc状态:{} - -".format(rc), "{}-连接broker成功".format(str(client._client_id)))fire_success(request_type="c_success",name='count_connected',response_time=0,response_length=0,exception=None,context=None)else:print("rc状态:{} - -".format(rc), "{}-连接broker失败".format(str(client._client_id)))fire_success(request_type="c_fail",name="client_id: " + str(client._client_id),response_time=0,response_length=0,exception=None,context=None)if __name__ == '__main__':run_single_user(MQTTLocustUser)

2. 代码分析-locust库部分

并发请求能力还是使用的locust库的能力。官方只提供了http协议接口的相关类,没直接提供mqtt协议的,但是我们可以按照官方的规范,自定义相关的类,只要继承UserTaskSet即可。

User

首先是先定义User类,这里就是用来生成我要用来测试的车辆。

类初始化的时候,黄色框里,会去队列里取出车辆信息,用来做一些相关的设置。client来源于from paho.mqtt import client as mqtt_client提供的能力,固定用法,按照人家的文档使用就行。

红色框里,是User类的2个重要熟悉属性:

  • tasks: 这里定义了生成的用户需要去干哪些事情,也就是对应脚本里的PublishTask类下面定义的内容。
  • wait_time: 用户在执行task时间隔停留的时间,可以是个区间,在里面随机。我这里意思是每2s发送一次数据到broker。

绿色框里,定义了一个字典容器,用来存放当前用户已发送成功的消息内容,因为后面我要取出来把里面相关的数据写到生成的报表中去。

蓝色框里有2个方法,也是locust提供的能力:

  • on_start:当用户开始运行时调用,这里我做了车辆连接broker代理的处理,注意这里需要设置tls,因为服务连接需要。

  • on_stop:当用户结束运行时调用,这里我做了一些其他的处理,比如把运行期间断开连接的车辆信息写到本地csv中。

TaskSet

定义好User类,就需要来定义TaskSet类,你得告诉产生出来的用户,要干点啥。

我这根据业务需要,就是让车辆不停的像broker发送数据即可。

红色部分,同样是paho.mqtt提供的能力,会启动新的线程去执行你定义的事情。

黄色部分,就是做发送数据的操作,并且我可以拿到一些返回,查看源码就可以知道返回的是MQTTMessageInfo类。

注意返回的2个属性:

  • mid: 返回这个消息发送的顺序
  • rc: 表示发送的响应状态,0 就是成功

绿色部分,还记得我在上面的User类中定义了一个容器,在这里就把发送的消息相关信息放到容器中去,留着后面使用。

2. 代码分析-paho.mqtt库部分

上面的代码已经用到了不少paho.mqtt的能力,这里再进行整体梳理下。

  • client.Client():声明一个client
  • client.username_pw_set(): 设置客户端的用户名,密码
  • client.tls_set_context: 设置ssl模式
  • client.connect(): 连接代理
  • client.publish:向代理推送消息

还用到了一些回调函数:

  • on_connect:连接操作成功时回调
  • on_publish:发布成功时回调
  • on_disconnect:客户端与代理断开连接时回调

另外还用到了一个事件函数events.request

当客户端发送请求时会调用,不管是请求成功还是请求失败;当我需要自定义我的报告内容时,就需要用到这个event

查看源码,知道里面要传哪些参数,那我们在调用时候就需要传入对应的参数。

比如我在发送回调函数里调用了该方法。

所以最后在控制台显示的报告里就有我定义的内容了。

由于后来在使用中发现,不知道会在什么时候出现批量断开的情况,于是在on_disconnect回调函数里增加了对应处理,把相关的断开信息记录下来,运行结束的时候写到本地文件里去。

后来我主动尝试客户端断开的情况测试了下文件的写入结果,功能正常。

三、小结

后面就开始运行了,在运行过程中,开发关注链路服务的各项指标,这里就不展开了,业务缠身就并没有过多的去做这个事情,况且也不专业。确实也发现了不少问题,后面逐步优化,再继续测试。

现在稳定运行12h,服务正常,暂时就先告一段落了。后面还有会相关其他性能测试场景,届时就可以针对性的展开分享下了。

另外,这个脚本分享也只是仅供参考,现在我这是使用简单,本着能用就行,可能存在一些不合理需要优化的地方,有需要的朋友还请自行查阅相关文档。

相关文章:

基于Locust实现MQTT协议服务的压测脚本

一、背景简介 业务背景大概介绍一下,就是按照国标规定,车辆需要上传一些指定的数据到ZF的指定平台,同时车辆也会把数据传到企业云端服务上,于是乎就产生了一些性能需求。 目前我们只是先简单的进行了一个性能场景的测试&#xf…...

AURIX TC3XX Cached PFLASH与Non-Cached PFLASH的区别

Cached ? Non-Cached? 在阅读TC3XX的用户手册时,在内存映射表中,有两个segment都是Program Flash,而且大小都一样是3M,一个是segment 8 另一个是segment10 这难免让人产生疑惑,二者区别在哪? …...

uniapp开发小程序-显示左滑删除效果

一、效果图&#xff1a; 二、代码实现&#xff1a; <template><view class"container"><view class"myorderList"><uni-swipe-action><uni-swipe-action-item class"swipe-action-item" :right-options"option…...

FPGA 的数字信号处理:Verilog 实现简单的 FIR 滤波器

该项目介绍了如何使用 Verilog 实现具有预生成系数的简单 FIR 滤波器。 绪论 不起眼的 FIR 滤波器是 FPGA 数字信号处理中最基本的模块之一&#xff0c;因此了解如何将具有给定抽头数及其相应系数值的基本模块组合在一起非常重要。因此&#xff0c;在这个关于 FPGA 上 DSP 基础…...

使用粒子群优化算法(PSO)辨识锂电池二阶RC模型参数(附MATLAB代码)

目录 一、原理部分 二、代码详解部分 三、结果及分析 一、原理部分 PSO算法由美国学者于 1995 年提出&#xff0c;因其算法简单、效果良好&#xff0c;而在很多领域得到了广泛应用。该算法的起源是模拟鸟群的觅食过程&#xff0c;形成一种群体智能搜索算法。 其核心是&#…...

如何利用地面控制点实现倾斜摄影三维模型数据的几何坐标变换和纠正?

如何利用地面控制点实现倾斜摄影三维模型数据的几何坐标变换和纠正&#xff1f; 倾斜摄影是一种在空中拍摄地表物体的技术&#xff0c;可以获得高分辨率、高精度的三维模型数据&#xff0c;广泛应用于城市规划、建筑设计、土地管理等领域。然而&#xff0c;由于航拍时无法避免姿…...

设计规则之里氏替换原则

tip: 作为程序员一定学习编程之道&#xff0c;一定要对代码的编写有追求&#xff0c;不能实现就完事了。我们应该让自己写的代码更加优雅&#xff0c;即使这会费时费力。 相关规则&#xff1a; 推荐&#xff1a;体系化学习Java&#xff08;Java面试专题&#xff09; 1.6大设…...

【叠高高】叠蛋糕游戏的微信小程序开发流程详解

记得小时候玩过的搭积木游戏吗&#xff0c;和叠高高游戏原理差不多的&#xff0c;与之类似的还有盖高楼游戏&#xff0c;就是看谁盖的&#xff08;叠的&#xff09;最高&#xff0c;这里讲一下比较基础的叠高高游戏小程序实现过程&#xff0c;对编程感兴趣的同学可以参考学习一…...

收集关键词的方法有哪些?(如何查找精准的行业流量关键词)

关键词的收集通常可以通过以下几种方法: 关键词收集方法 1.根据市场价值、搜索词竞争性和企业实际产品特征进行筛选&#xff1a;确定您的关键词列表之前&#xff0c;建议先进行市场分析&#xff0c;了解您的竞争对手、行业状况和目标受众等信息&#xff0c;以更好地了解所需的特…...

【GreenDao】RxQuery查询并修改GreenDao数据库,完成后更新UI

GreenDao是一个轻量级的ORM&#xff08;对象关系映射&#xff09;数据库&#xff0c;而RxJava是一个响应式编程库&#xff0c;可以帮助我们更轻松地处理异步事件。在 Android 应用程序中&#xff0c;您可以使用这两个库一起处理数据库查询和更新&#xff0c;并使用观察者模式来…...

Modifier ‘public‘ is redundant for interface methods错误

java中接口的方法默认是 public abstract 的 所以放心的删掉public即可&#xff0c;如果改为protected 或者 private还会报错 接口的方法及变量的默认修饰符 1.接口中每一个方法也是隐式抽象的,接口中的方法会被隐式的指定为 public abstract &#xff08;只能是 public abst…...

Redis缓存击穿及解决问题

缓存击穿的意思是对于设置了过期时间的key,缓存在某个时间点过期的时候&#xff0c;恰好这时间点对这个 Key有大量的并发请求过来&#xff0c;这些请求发现缓存过期- -般都会从后端DB加载数据并回设到缓存&#xff0c;这个时候大并发的请求可能会瞬间把DB压垮。 解决方案有两种…...

环境感知算法——2.CenterNet基于KITTI数据集训练

1. CenterNet简介 CenterNet采用了一种新的检测思路&#xff0c;即以目标中心点为基础&#xff0c;直接回归出目标的位置和大小。而传统的目标检测算法通常会先产生大量候选框&#xff08;Anchor&#xff09;&#xff0c;再通过分类器进行筛选&#xff0c;这种方法比较复杂。C…...

JUC 高并发编程基础篇

JUC 高并发编程基础篇 • 1、什么是 JUC • 2、Lock 接口 • 3、线程间通信 • 4、集合的线程安全 • 5、多线程锁 • 6、Callable 接口 • 7、JUC 三大辅助类: CountDownLatch CyclicBarrier Semaphore • 8、读写锁: ReentrantReadWriteLock • 9、阻塞队列 • 10、ThreadPo…...

【十二】设计模式~~~行为型模式~~~命令模式(Java)

命令模式-Command Pattern【学习难度&#xff1a;★★★☆☆&#xff0c;使用频率&#xff1a;★★★★☆】 1.1. 模式动机 在软件设计中&#xff0c;我们经常需要向某些对象发送请求&#xff0c;但是并不知道请求的接收者是谁&#xff0c;也不知道被请求的操作是哪个&#xf…...

可再生能源的不确定性和储能系统的时间耦合的鲁棒性和非预期性区域微电网的运行可行性研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

Revit中如何使创建的族文件内存变小

族文件的大小直接影响到项目文件的大小和软件运行速度&#xff0c;如何将族文件做的最小并且满足项目需求呢? 方法一&#xff1a;清除未使用项 1. 族制作完成可以把族文件中未用到的外部载入族或其他多余数据删掉&#xff0c;点击“管理”选项卡下拉的“清除未使用项”命令; 2…...

ClassLoader源码

介绍 ClassLoader 顾名思义就是类加载器 ClassLoader 是一个抽象类 没有父类 作用 1.负责将 Class 加载到 JVM 中 2.审查每个类由谁加载&#xff08;父优先的等级加载机制&#xff09; 3.将 Class 字节码重新解析成 JVM 统一要求的对象格式 常量&变量 //注册本地方法…...

Kafka分区消息积压排查指南

针对某个TOPIC只有几个分区积压的场景&#xff0c;可以采用以下方法进行排查&#xff1a; 消息生产是否指定key&#xff1f; 如果指定了消息key&#xff0c;那么消息会指定生产到hash(key)的分区中。如果指定了key&#xff0c;那么有下列几种可能&#xff1a; 生产该key的消息体…...

数据库 期末复习(4) 概念数据库的设计

第一部分 为啥要引入概念数据库 感觉只有一个重点 实体联系模型----ER模型 第二部分-----实体联系模型 这个例子可以全看完之后再来看 举个例子:根据COMPANY数据库的需求来构造数据库模式:The company is organized into DEPARTMENTs. Each department has a name, number …...

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…...

第19节 Node.js Express 框架

Express 是一个为Node.js设计的web开发框架&#xff0c;它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用&#xff0c;和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...

【杂谈】-递归进化:人工智能的自我改进与监管挑战

递归进化&#xff1a;人工智能的自我改进与监管挑战 文章目录 递归进化&#xff1a;人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管&#xff1f;3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

Map相关知识

数据结构 二叉树 二叉树&#xff0c;顾名思义&#xff0c;每个节点最多有两个“叉”&#xff0c;也就是两个子节点&#xff0c;分别是左子 节点和右子节点。不过&#xff0c;二叉树并不要求每个节点都有两个子节点&#xff0c;有的节点只 有左子节点&#xff0c;有的节点只有…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

GC1808高性能24位立体声音频ADC芯片解析

1. 芯片概述 GC1808是一款24位立体声音频模数转换器&#xff08;ADC&#xff09;&#xff0c;支持8kHz~96kHz采样率&#xff0c;集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器&#xff0c;适用于高保真音频采集场景。 2. 核心特性 高精度&#xff1a;24位分辨率&#xff0c…...

Mobile ALOHA全身模仿学习

一、题目 Mobile ALOHA&#xff1a;通过低成本全身远程操作学习双手移动操作 传统模仿学习&#xff08;Imitation Learning&#xff09;缺点&#xff1a;聚焦与桌面操作&#xff0c;缺乏通用任务所需的移动性和灵活性 本论文优点&#xff1a;&#xff08;1&#xff09;在ALOHA…...