当前位置: 首页 > news >正文

性能测试 - Locust WebSocket client

Max.Bai

2024.10

0. 背景

Locust 是性能测试工具,但是默认只支持http协议,就是默认只有http的client,需要其他协议的测试必须自己扩展对于的client,比如下面的WebSocket client。

1. WebSocket test Client
“”“
Max.Bai
Websocket Client
”“”
import json
import logging
import secrets
import threading
import time
from typing import Callable, Optionalimport websocket
from locust import eventslogger = logging.getLogger(__name__)class WebSocketClient:def __init__(self, host: str, log_messages: bool = False):self._host: str = hostself._id: str = secrets.token_hex(8)self._alias: Optional[str] = Noneself._ws: Optional[websocket.WebSocketApp] = Noneself.log_messages = log_messagesself.count_recv_type = Falseself.heartbeat_auto_respond = Falseself._recv_messages: list = []self.messages: list = []self._sent_messages: list = []def __enter__(self):self.connect()return selfdef __exit__(self, type, value, traceback):self.disconnect()@propertydef tag(self) -> str:tag = f"{self._host} <{self._id}>"if self._alias:tag += f"({self._alias})"return tagdef connect(self, alias: Optional[str] = None, headers: Optional[dict] = None, on_message: Optional[Callable] = None):if not self._ws:self._alias = aliasself._ws = websocket.WebSocketApp(url=self._host,header=headers,on_open=self._on_open,on_message=on_message if on_message else self._on_message,on_close=self._on_close,)thread = threading.Thread(target=self._ws.run_forever)thread.daemon = Truethread.start()time.sleep(3)else:logger.warning("An active WebSocket connection is already established.")def is_connected(self) -> bool:return self._ws is not Nonedef disconnect(self):if self._ws:self._ws.close()self._alias = Noneelse:logger.warning("No active WebSocket connection established.")def _on_open(self, ws):logger.debug(f"[WebSocket] {self.tag} connected.")events.request.fire(request_type="ws_client",name="connect",response_time=0,response_length=0,)def _on_message(self, ws, message):recv_time = time.time()recv_time_ms = int(recv_time * 1000)recv_time_ns = int(recv_time * 1000000)logger.debug(f"[WebSocket] {self.tag} message received: {message}")if self.log_messages:self._recv_messages.append(message)self.messages.append(message)# public/respond-heartbeatif self.heartbeat_auto_respond:if "public/heartbeat" in message:self.send(message.replace("public/heartbeat", "public/respond-heartbeat"))if self.count_recv_type:try:msg = json.loads(message)id = str(msg.get("id", 0))if len(id) == 13:resp_time = recv_time_ms - int(id)elif len(id) == 16:resp_time = (recv_time_ns - int(id)) / 1000elif len(id) > 13:resp_time = recv_time_ms - int(id[:13])else:resp_time = 0method = msg.get("method", "unknown")code = msg.get("code", "unknown")error = msg.get("message", "unknown")# send_time = int(msg.get("nonce", 0))if method in ["public/heartbeat", "private/set-cancel-on-disconnect"]:events.request.fire(request_type="ws_client",name=f"recv {method}",response_time=0,response_length=len(msg),)elif code == 0:events.request.fire(request_type="ws_client",name=f"recv {method} {code}",# response_time=recv_time - send_time,response_time=resp_time,response_length=len(msg),)else:events.request.fire(request_type="ws_client",name=f"recv {method} {code}",response_time=resp_time,response_length=len(msg),exception=error,)except Exception as e:events.request.fire(request_type="ws_client",name="recv error",response_time=0,response_length=len(msg),exception=str(e),)def _on_close(self, ws, close_status_code, close_msg):logger.debug(f"[WebSocket] {self.tag} closed.")self._ws = Noneevents.request.fire(request_type="ws_client",name="close",response_time=0,response_length=0,)def set_on_message(self, on_message: Callable):self._ws.on_message = on_messagedef send(self, message: str):if self._ws:self._ws.send(data=message)if self.log_messages:self._sent_messages.append(message)logger.debug(f"[WebSocket] {self.tag} message sent: {message}")else:logger.warning(f"No active [WebSocket] {self.tag} connection established.")raise ConnectionError("No active [WebSocket] connection established.")def clear(self):self._recv_messages = []self._sent_messages = []self.messages = []def expect_messages(self,matcher: Callable[..., bool],count: int = 1,timeout: int = 10,interval: int = 1,) -> list:"""Expect to receive one or more filtered messages.Args:matcher (Callable): A matcher function used to filter the received messages.count (int, optional): Number of messages to be expected before timeout. Defaults to 1.timeout (int, optional): Timeout in seconds. Defaults to 10.interval (int, optional): Interval in seconds. Defaults to 1.Returns:list: A list of messages filtered by the matcher."""deadline: float = time.time() + timeoutresult: list = []  # messages filtered by the matcherseen: list = []  # messages already seen by the matcher to be excluded from further matchingwhile time.time() < deadline:snapshot: list = [*self._recv_messages]for element in seen:if element in snapshot:snapshot.remove(element)result.extend(filter(matcher, snapshot))if len(result) >= count:breakseen.extend(snapshot)time.sleep(interval)if len(result) < count:logger.warning(f"({self.tag}) Expected to receive {count} messages, but received only {len(result)} messages.")return result
2. 如何使用
class PrivateWsUser(User):def on_start(self):self.ws_client=WebSocketClient("wss://abc.pp.com/chat", log_message=True)self.ws_client.connect()@taskdef send_hello()self.ws_client.send("hello world")
3. 扩展

可自行扩展on_message 方法,上面的on_message 方法是json 格式的信息处理

相关文章:

性能测试 - Locust WebSocket client

Max.Bai 2024.10 0. 背景 Locust 是性能测试工具&#xff0c;但是默认只支持http协议&#xff0c;就是默认只有http的client&#xff0c;需要其他协议的测试必须自己扩展对于的client&#xff0c;比如下面的WebSocket client。 1. WebSocket test Client “”“ Max.Bai W…...

html中鼠标位置信息

pageX&#xff1a;鼠标距离页面的最左边的距离&#xff0c;包括滚动条的长度。clientX&#xff1a;鼠标距离浏览器视口的左距离&#xff0c;不包括滚动条。offsetX&#xff1a;鼠标到事件源左边的距离。movementX&#xff1a;鼠标这次触发的事件的位置相对于上一次触发事件的位…...

kubernetes v1.29.XX版本HPA、KPA、VPA并压力测试

序言&#xff1a; 在大型电商、购物、直播活动期间&#xff0c;对于火爆流量的激增&#xff0c;如何保障业务稳定并且做到资源不浪费&#xff0c;自动回收。 场景&#xff1a;kubernetes 原生容器化承载业务流量&#xff08;非云环境&#xff09; 方案&#xff1a;kubernetes自…...

flutter 常用UI组件

文章目录 1. Toast 文本提示框oktoastbot_toast2. loading 加载窗flutter_easyloading3. 对话框gex dialog4.下拉刷新pull_to_refresh5. pop 窗custom_pop_up_menu6. pin code 密码框pinput7. 二维码qr_flutter8. swiper 滚动组件carousel_sliderflutter_swiper_view9. Badge 角…...

HarmonyOS NEXT应用开发边学边玩系列:从零实现一影视APP (五、电影详情页的设计实现)

在上一篇文章中&#xff0c;完成了电影列表页的开发。接下来&#xff0c;将进入电影详情页的设计实现阶段。这个页面将展示电影的详细信息&#xff0c;包括电影海报、评分、简介以及相关影人等。将使用 HarmonyOS 提供的常用组件&#xff0c;并结合第三方库 nutpi/axios 来实现…...

hive表修改字段类型没有级连导致历史分区报错

一&#xff1a;问题背景 修改hive的分区表时有级连概念&#xff0c;指字段的最新状态&#xff0c;默认只对往后的分区数据生效&#xff0c;而之前的分区保留历史元数据状态。好处就是修改语句的效率很快&#xff0c;坏处就是如果历史分区的数据还有用&#xff0c;那就回发生分…...

云上贵州多彩宝荣获仓颉社区先锋应用奖 | 助力数字政务新突破

在信息技术应用创新的浪潮中&#xff0c;仓颉社区吸引了众多企业和开发者的积极参与&#xff0c;已有多个应用成功落地&#xff0c;展现出蓬勃的创新活力。仓颉编程语言精心遴选了在社区建设、应用创新、开源共建、技术布道等方面做出突出贡献的优秀项目应用&#xff0c;并颁发…...

JS宏进阶:JS宏中的文件系统FileSystem

FileSystem对象中包含文件和文件夹的一些基本和常见的操作接口。比如&#xff1a;判断路径是否存在、创建文件夹、创建文件、读取文件等等。他的出现可以取代文件流对txt或csv等文件的操作。官方文档网址&#xff1a;https://open.wps.cn/previous/docs/client/wpsLoad&#xf…...

XML序列化和反序列化的学习

1、基本介绍 在工作中&#xff0c;经常为了调通上游接口&#xff0c;从而对请求第三方的参数进行XML序列化&#xff0c;这里常使用的方式就是使用JAVA扩展包中的相关注解和类来实现xml的序列化和反序列化。 2、自定义工具类 import javax.xml.bind.JAXBContext; import javax.x…...

npm ERR! code CERT_HAS_EXPIRED

很不幸看到这个提示。 查了很多网上的解决方案&#xff0c;都提到一个解决方案&#xff1a; npm install -g npmlatest 靠就是执行install报的错&#xff0c;你要我通过install来解决这个问题。可见大多数人都是转发&#xff0c;从不自己试试。 第二个是看系统时间。这个基…...

30分钟内搭建一个全能轻量级springboot 3.4 + 脚手架 <5> 5分钟集成好caffeine并使用注解操作缓存

快速导航 <1> 5分钟快速创建一个springboot web项目 <2> 5分钟集成好最新版本的开源swagger ui&#xff0c;并使用ui操作调用接口 <3> 5分钟集成好druid并使用druid自带监控工具监控sql请求 <4> 5分钟集成好mybatisplus并使用mybatisplus generator自…...

【设计模式-结构型】装饰器模式

一、什么是装饰器模式 装饰器模式&#xff08;Decorator Pattern&#xff09;是一种结构型设计模式&#xff0c;它的核心思想是在不改变原有对象结构的情况下&#xff0c;动态地给对象增加一些功能&#xff0c;从而达到扩展功能的目的。举个例子&#xff0c;今天在家妈妈给蒸馒…...

分布式数据存储基础与HDFS操作实践(副本)

以下为作者本人撰写的报告&#xff0c;步骤略有繁琐&#xff0c;不建议作为参考内容&#xff0c;可以适当浏览&#xff0c;进一步理解。 一、实验目的 1、理解分布式文件系统的基本概念和工作原理。 2、掌握Hadoop分布式文件系统&#xff08;HDFS&#xff09;的基本操作。 …...

Linux 进程前篇(冯诺依曼体系结构和操作系统)

目录 一.冯诺依曼体系结构 1.概念 2.硬件层面的数据流 3.总结加补充 二.操作系统 (Operating System) 1.概念 2.设计OS的目的 3.定位 4.操作系统的管理 5.计算机体系的层状结构 在我们认识进程之前&#xff0c;我们先了解什么是冯诺依曼体系结构 一.冯诺依曼体系结构…...

Springboot Redisson 分布式锁、缓存、消息队列、布隆过滤器

redisson-spring-boot-starter 是 Redisson 提供的 Spring Boot 集成包&#xff0c;旨在简化与 Redis 的交互&#xff0c;包括分布式锁、缓存、消息队列、布隆过滤器等功能的实现。 Maven 依赖 在 Spring Boot 项目中添加 redisson-spring-boot-starter 依赖&#xff1a; <…...

【C语言】_字符串拷贝函数strcpy

目录 1. 函数声明及功能 2. 使用示例 3. 注意事项 4. 模拟实现 4.1 第一版&#xff1a;基本功能判空const修饰 4.2 第二版&#xff1a;优化对于\0的单独拷贝 4.3 第三版&#xff1a;仿strcpy的char*返回值 1. 函数声明及功能 char * strcpy ( char * destination, cons…...

基于 Vue 的拖拽缩放卡片组件:实现思路、方法及使用指南

引言 在前端开发中&#xff0c;实现可交互的组件能够极大地提升用户体验。本文将介绍一个基于 Vue 封装的可缩放卡片组件&#xff0c;从实现思路、代码具体实现以及使用方法等方面进行详细阐述&#xff0c;帮助开发者更好地理解和运用这一组件。项目源码地址&#xff1a;https…...

nginx 实现 正向代理、反向代理 、SSL(证书配置)、负载均衡 、虚拟域名 ,使用其他中间件监控

我们可以详细地配置 Nginx 来实现正向代理、反向代理、SSL、负载均衡和虚拟域名。同时&#xff0c;我会介绍如何使用一些中间件来监控 Nginx 的状态和性能。 1. 安装 Nginx 如果你还没有安装 Nginx&#xff0c;可以通过以下命令进行安装&#xff08;以 Ubuntu 为例&#xff0…...

Kafka客户端-“远程主机强迫关闭了一个现有的连接”故障排查及解决

Kafka客户端-“远程主机强迫关闭了一个现有的连接”故障排查及解决 1. 故障现象 Kafka客户端发送数据时&#xff0c;出现“远程主机强迫关闭了一个现有的连接”错误&#xff0c;导致数据发送失败。错误信息如下&#xff1a; 2. 故障排查 【1】. 查看服务网络状态 出现故障…...

Node.js - Express框架

1. 介绍 Express 是一个基于 Node.js 的 Web 应用程序框架&#xff0c;主要用于快速、简便地构建 Web 应用程序 和 API。它是目前最流行的 Node.js Web 框架之一&#xff0c;具有轻量级、灵活和功能丰富的特点。 核心概念包括路由&#xff0c;中间件&#xff0c;请求与响应&a…...

投资组合优化中的常见陷阱:如何用LINGO和MATLAB避免风险计算错误

投资组合优化中的常见陷阱&#xff1a;如何用LINGO和MATLAB避免风险计算错误 在金融投资领域&#xff0c;优化投资组合是实现收益最大化和风险最小化的关键手段。然而&#xff0c;许多金融分析师和量化投资爱好者在实际操作中常常陷入各种计算陷阱&#xff0c;导致结果偏离预期…...

3步重塑:foobox-cn让您的foobar2000音乐体验焕然一新

3步重塑&#xff1a;foobox-cn让您的foobar2000音乐体验焕然一新 【免费下载链接】foobox-cn DUI 配置 for foobar2000 项目地址: https://gitcode.com/GitHub_Trending/fo/foobox-cn 还在为音乐播放器单调乏味的界面而苦恼吗&#xff1f;foobox-cn是专为foobar2000设计…...

资源提取高效解析与跨设备管理:猫抓浏览器扩展的技术实践

资源提取高效解析与跨设备管理&#xff1a;猫抓浏览器扩展的技术实践 【免费下载链接】cat-catch 猫抓 浏览器资源嗅探扩展 / cat-catch Browser Resource Sniffing Extension 项目地址: https://gitcode.com/GitHub_Trending/ca/cat-catch 在数字化内容爆炸的今天&…...

告别手动爆肝:用AiScan-N自动化你的CTF Web漏洞测试(SQL注入/文件上传实战)

智能渗透测试革命&#xff1a;AiScan-N在CTF中的实战应用与效率跃升 当凌晨三点的CTF比赛进入白热化阶段&#xff0c;你的眼皮开始打架&#xff0c;而对手却像永动机般不断提交flag——这种场景下&#xff0c;传统手动渗透测试的局限性暴露无遗。我曾亲眼见证一位资深红队成员…...

保姆级教程:手把手教你用Zabbix监控MySQL数据库(Percona模板实战)

深度实战&#xff1a;基于Percona模板构建企业级MySQL监控体系 当数据库规模突破百万级QPS时&#xff0c;传统的手动检查方式就像用体温计测量森林大火——既低效又危险。去年某电商大促期间&#xff0c;我们曾因未及时发现连接数耗尽导致核心交易库雪崩&#xff0c;这个教训让…...

Bluesky AI助手Attie:用户不满下的未来挑战

Attie&#xff1a;定制化社交媒体动态新尝试Bluesky正在开发的新型AI助手Attie&#xff0c;以AT协议命名&#xff0c;可创建定制化的社交媒体动态。它作为一个独立的可选应用程序&#xff0c;目前处于仅限受邀用户参与的封闭测试阶段。其目标是打造一个比单纯搜索话题更全面的时…...

完整贡献指南:如何为endoflife.date添加新的产品支持信息

完整贡献指南&#xff1a;如何为endoflife.date添加新的产品支持信息 【免费下载链接】endoflife.date Informative site with EoL dates of everything 项目地址: https://gitcode.com/gh_mirrors/en/endoflife.date 你是否想为开源项目贡献自己的力量&#xff0c;但不…...

用51单片机+Proteus仿真,从零到一复刻一个数码管电子钟(附完整代码和电路图)

从零构建51单片机数码管电子钟&#xff1a;Proteus仿真与实战全解析 数码管电子钟作为单片机入门经典项目&#xff0c;能系统训练定时器、中断、数码管驱动等核心技能。但很多初学者在独立实现时&#xff0c;常遇到仿真效果不稳定、显示闪烁或计时不准等问题。本文将用保姆级教…...

PyTorch 2.5 + Jupyter 开发环境搭建:5分钟搞定AI模型训练与调试

PyTorch 2.5 Jupyter 开发环境搭建&#xff1a;5分钟搞定AI模型训练与调试 1. 环境准备与快速部署 PyTorch 2.5作为当前最流行的深度学习框架之一&#xff0c;其开箱即用的特性让AI开发变得前所未有的简单。我们将使用预配置好的PyTorch-CUDA基础镜像&#xff0c;快速搭建完…...

告别setData!用mobx-miniprogram+miniprogram-computed重构你的小程序状态管理(保姆级避坑指南)

重构小程序状态管理&#xff1a;mobx-miniprogram与miniprogram-computed实战指南 如果你正在开发一个功能逐渐复杂的中大型微信小程序&#xff0c;大概率已经遇到了这样的困境&#xff1a;页面间状态共享越来越混乱&#xff0c;setData调用遍布各个角落&#xff0c;视图更新性…...