如何确定RocketMQ中消费者的线程大小
背景
随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求进行缓冲,RocketMQ就是一款非常优秀消息队列的中间件,在互联网领域久经考验,也被各个行业广泛应用。
在上一篇文章中,介绍了RocketMQ的工作原理使用。物联网中如何使用RockeMQ
如何在配置文件端配置消费者线程大小
当生产者将大量的消息堆积到消息队列中时,需要同步启用消费者去消费队列中的消息,达到动态平衡的效果。
如下代码所示,在消费者类上,会使用@RocketMQMessageListener注解,并填写必要的属性:consumerThreadNumber:消费线程、主题、消费组。
@RocketMQMessageListener(// 指定消费线程大小consumeThreadNumber = 16,topic = TOPIC_DEMO,consumerGroup = "consumer_demo_group")
@Component
public class Consumer implements RocketMQListener<MessageExt> {private final String CHARSET = Charset.defaultCharset().name();@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String str = new String(body, Charset.forName(this.CHARSET));System.out.println("消费者消费的消息为: " + str);}
}
其中的topic、consumerGroup可以指定一次就不会变啦,但是consumerThreadNumber会根据机器的性能发生变化;因此需要将其提出放到配置文件中,方便修改,比如"application.yaml"。
那应该如何实现呢?
其中 consumerThreadNumber = 16,表明填入的是一个static的变量,因此如果简单地使用@Value来进行注入变量是不成功的,因为它只能注入非静态变量。为了能实现从配置文件中读取变量,并转为static变量,采用了显示set方式赋值变量的方法。
/**
* 注入mq消费的线程数量
*/
public static int RocketMQThreadSize;@Value("${biz.RocketMQThreadSize}")
public void setRocketMQThreadSize(int threadSize) {
RocketMQThreadSize = threadSize;
}
那在配置文件中就可以配置RocketMQThreadSize的大小啦。如下在application.yaml就可以搞定。
biz: RocketMQThreadSize: 200
如何使得自定义的线程大小生效
如上一章所示,可以得到静态的变量,那如何在消费者中生效呢?幸好RocketMQ提供一个接口可以实现消费者线程的自定义。
在消费者的类需要实现RocketMQPushConsumerLifecycleListener接口即可,然后在类中实现prepareStart方法即可。具体如下所示:
@RocketMQMessageListener(topic = TOPIC_DEMO,consumerGroup = "consumer_demo_group")
@Component
public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {private final String CHARSET = Charset.defaultCharset().name();@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String str = new String(body, Charset.forName(this.CHARSET));System.out.println("消费者消费的消息为: " + str);}@Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {// 指定消费线程大小defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);}
}
在prepareStart方法中,指定一些必要的线程参数
- 最大线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize); - 最小线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
并且通过实验和查看源码,其中最大、最小设置一样才会生效。
如何确定合适的线程大小
以上的步骤已经帮忙把线程设置提取出来啦,之后只需在配置文件中修改线程数大小,而不需去代码中修改,避免代码导致系统出现问题。那如何去确定线程的数量大小呢?
线程是计算机执行任务的基本的单位,即消费任务可以交给线程去执行。
当线程数量较少时,CPU性能不能充分发挥。但是线程数量过的就会导致过多的线程处于等待中,机器的负载升高。因此需要确定适合当前机器的线程数量。
在RocketMQ线程调优有两个指标可以帮助大致确定消费线程大小:
- 消费者的
TPS,表明消费者的能力; - 机器负载,分为
CPU负载和IO负载,和自身的核心数量有关。
RocketMQ提供web界面,可以监测TPS的大小,这个数量当然是越大越好,但是也要考虑负载。

在服务器输入top命令就可以看大,当前机器的负载:

分别为1、5、15分钟负载。
在进行压测的时候,需要知道机器的核心数量,监测负载的时候负载的大小就不能超过核心数量。
在测试的时候可以从小到大调节线程数大小,并且关注TPS和负载。
结尾
以上就是确定消费者线程大小的整个过程,有疑问欢迎留言交流!!!
线程介绍
相关文章:
如何确定RocketMQ中消费者的线程大小
背景 随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求…...
OpenAPI SDK组件之Spring Aop源码拓展
Spring Aop 看这个分享的应该都用过Spring Aop,这里就不再过多介绍了它是什么了。 我抽取了Spring Aop的部分源码,通过它实现请求参数可变拦截,同时apisdk离开Spring框架,仍然可以正常运行。 讲拦截也好,通知也罢&a…...
蓝桥杯C/C++VIP试题每日一练之龟兔赛跑预测
💛作者主页:静Yu 🧡简介:CSDN全栈优质创作者、华为云享专家、阿里云社区博客专家,前端知识交流社区创建者 💛社区地址:前端知识交流社区 🧡博主的个人博客:静Yu的个人博客…...
为你的Vue2.x老项目安装Vite发动机吧
天下苦webpack久矣,相信作为前端开发者一定经历过在项目迭代时间较长的时候经历漫长等待的这一过程,每一次保存都会浪费掉大量时间,这是webpack这种机制所带来的问题。 于是,尤大为我们带来了新一代前端构建工具:vite…...
ZCMU--5012: 铺设道路(差分思路)
Description 春春是一名道路工程师,负责铺设一条长度为 n 的道路。 铺设道路的主要工作是填平下陷的地表。 整段道路可以看作是 n 块首尾相连的区域,一开始,第 i 块区域下陷的深度为 di。 春春每天可以选择一段连续区间 [L,R]&…...
算法模板总结(自用)
算法模板总结滑动窗口双指针算法数组相关合并两个有序数组左右指针技巧快慢指针技巧字符串相关左右指针反转字符串问题快慢指针替换空格字符问题链表相关快慢双指针删除链表的倒数第N个节点链表相交环形链表链表操作几数之和两数之和四个数组的四数之和三数之和同一数组中四数之…...
【架构师】零基础到精通——架构发展
博客昵称:架构师Cool 最喜欢的座右铭:一以贯之的努力,不得懈怠的人生。 作者简介:一名Coder,软件设计师/鸿蒙高级工程师认证,在备战高级架构师/系统分析师,欢迎关注小弟! 博主小留言…...
C++(20):三路比较运算符
C20增加了三路比较运算符<>(戏称航天飞机运算符),用于对类的比较运算符进行统一的设计。有两种使用方式:默认比较对于某些类,如果按照其成员逐一比较即可决定比较运算符的值,那么可以使用默认的三路运…...
MySQL workbench 字符集、字符序的概念与联系
在数据的存储上,MySQL提供了不同的字符集支持。而在数据的对比操作上,则提供了不同的字符序支持。 MySQL提供了不同级别的设置,包括server级、database级、table级、column级,可以提供非常精准的设置。 什么是字符集、字符序&am…...
DBA之路---数据库启动与关闭过程
DBA之路—数据库启动与关闭过程 1、启动过程 oracle启动的四个状态 shutdown、就是数据库关闭状态。 nomount模式 #启动instance ,读取参数文件、分配sga空间启动后台进程,打开alter日志和其他trace文件startup nomount #该模式下只会创建实例并不加…...
Shell文件包含
和其他语言一样,Shell 也可以包含外部脚本。这样可以很方便的封装一些公用的代码作为一个独立的文件。 一、语法格式 Shell 文件包含的语法格式如下: . filename # 注意点号(.)和文件名中间有一空格 或 source filename 在当前bash环境下读取并执行file…...
计算机网络(六): HTTP,HTTPS,DNS,网页解析全过程
文章目录一、HTTP头部包含的信息通用头部请求头部响应头部实体头部二、Keep-Alive和非Keep-Alive的区别三、HTTP的方法四、HTTP和HTTPS建立连接的过程4.1 HTTP4.2 HTTPS五、HTTP和HTTPS的区别六、HTTPS的加密方式七、cookie和sessionsessioncookie八、HTTP状态码状态码200&…...
Android仿京东金融的数值滚动尺功能
自定义数值滚动尺,这个用的还是挺多的,例如京东金融的通过滚动尺选择金额等,而这次就是高仿京东金融的数值滚动尺。首先看看下效果图,如下:首先先给你们各个变量的含义,以免在后面的讲解中不知变量的意思,代码如下://最…...
Nginx 和 Tomcat 实现负载均衡
Nginx 和 tomcat 实现负载均衡 🏆荣誉认证:51CTO博客专家博主、TOP红人、明日之星;阿里云开发者社区专家博主、技术博主、星级博主。 💻微信公众号:微笑的段嘉许 📌本文由微笑的段嘉许原创! &am…...
【万能排序之qsort、b_sort 、s_sort】
文章目录前言:star:qsort函数函数参数qsort函数的使用:star:模拟实现万冒泡排序函数参数模拟实现b_sort注意点:star:模拟实现万能选择排序函数参数模拟实现s_sort最后前言 我们所熟悉的冒泡排序,选择排序,插入排序,二分排序等都是基于给定的一…...
利用InceptionV3实现图像分类
最近在做一个机审的项目,初步希望实现图像的四分类,即:正常(neutral)、涉政(political)、涉黄(porn)、涉恐(terrorism)。有朋友给推荐了个github上…...
【Java】CAS锁
一、什么是CAS机制(compare and swap) 1.概述 CAS的全称为Compare-And-Swap,直译就是对比交换。是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值。经过调查发现,…...
Linux服务器配置系统安全加固方法
1. SSH空闲超时时间建议为: 600-900 解决方案: 在【/etc/ssh/sshd_config】文件中设置【ClientAliveInterval】设置为600到900之间 vim /etc/ssh/sshd_config #将 ClientAliveInterval 参数值设置为 900 2. 修改检查SSH密码修改最小间隔 解决方案: 在【/etc/login.defs】文件…...
Codeforces Round #850 (Div. 2, based on VK Cup 2022 - Final Round)(A~E)
t宝酱紫喜欢出这种分类讨论的题?!A1. Non-alternating Deck (easy version)给出n张牌,按照题目给的顺序分给两人,问最后两人手中各有几张牌。思路:模拟。AC Code:#include <bits/stdc.h>typedef long…...
qt源码--信号槽
本篇主要从Qt信号槽的连接、断开、调用、对象释放等方面展开; 1.信号建立连接过程 connect有多个重载函数,主要是为了方便使用者,比较常用的有2种方式: a. QObject::connect(&timer, &QTimer::timeout, &loop, &am…...
做网安的这几年,挖漏洞接私活赚的是我工资的3倍,这些门道没几人知道
前言 这是我做网络安全工程师(简称网安)的第9个年头,从我工作的第3年起,我就一直在开始尝试去接网安方面的私活,这6年平均下来,我接私活赚的钱几乎是我工资的3倍。 而很多人要么不敢去做,要么就…...
ElevenLabs高棉文语音突然失效?2024年Q2政策更新后,这6类柬埔寨手机号注册账号已被静默降权
更多请点击: https://codechina.net 第一章:ElevenLabs高棉文语音的基本能力与本地化适配现状 ElevenLabs 作为全球领先的AI语音合成平台,自2023年Q4起逐步支持高棉语(Khmer,语言代码:km-KH)&a…...
如何用嘎嘎降AI处理汉语言文学论文:文学类毕业论文降AI免费完整操作教程
如何用嘎嘎降AI处理汉语言文学论文:文学类毕业论文降AI免费完整操作教程 帮同学处理过汉语言文学论文降AI教程,流程基本是固定的,记录下来供参考。 主推工具:嘎嘎降AI(www.aigcleaner.com),4.…...
Claude Code 总被封号怎么办,用 Taotoken 稳定接入大模型服务
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 Claude Code 总被封号怎么办,用 Taotoken 稳定接入大模型服务 许多开发者在日常工作中依赖 Claude Code 作为编程助手&…...
为什么你的ElevenLabs甘肃话输出像“普通话带口音”?5个声学特征参数调试错误导致92%失真率
更多请点击: https://kaifayun.com 第一章:甘肃话语音合成失真的现象学观察 甘肃话作为中原官话陇中片的代表性方言,其声调曲折性强、入声残留明显、连读变调规则复杂,为语音合成系统带来显著挑战。在部署基于Tacotron2WaveGlow的…...
3步找回密码:如何用ArchivePasswordTestTool解锁加密压缩包
3步找回密码:如何用ArchivePasswordTestTool解锁加密压缩包 【免费下载链接】ArchivePasswordTestTool 利用7zip测试压缩包的功能 对加密压缩包进行自动化测试密码 项目地址: https://gitcode.com/gh_mirrors/ar/ArchivePasswordTestTool 你是否曾经面对一个…...
5步彻底解决显卡驱动问题:Display Driver Uninstaller完整指南
5步彻底解决显卡驱动问题:Display Driver Uninstaller完整指南 【免费下载链接】display-drivers-uninstaller Display Driver Uninstaller (DDU) a driver removal utility / cleaner utility 项目地址: https://gitcode.com/gh_mirrors/di/display-drivers-unin…...
网易520发布会公布40余款游戏动态,新品精品化+AI应用成趋势
网易520发布会:多款新品游戏崭露头角5月20日,2026年『网易游戏520线上发布会』盛大举办,公布了40余款游戏及IP的最新动态。其中,《遗忘之海》官宣将于5月22日开启三测前瞻直播,并于28日正式开启测试。这款游戏出自《第…...
智能图像分层革命:5分钟将任何图片转换为可编辑PSD图层
智能图像分层革命:5分钟将任何图片转换为可编辑PSD图层 【免费下载链接】layerdivider A tool to divide a single illustration into a layered structure. 项目地址: https://gitcode.com/gh_mirrors/la/layerdivider 你是否曾面对一张精美的插画ÿ…...
杰理之RX修改为连接一个TX后需要再次按键或者其他操作才能连接第二个TX的功能需求【篇】
void user_wireless_dev_pair_code_pri() { y_printf(“user_wireless_dev_pair_code_pri”); u32 pair_code 0; wireless_dev_get_pair_code(“big_rx”, (u8 *)&pair_code, 1); wireless_dev_set_pair_code(“big_rx”, (u8 *)&pair_code); } //连接一个无线麦后&am…...
