如何确定RocketMQ中消费者的线程大小
背景
随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求进行缓冲,RocketMQ就是一款非常优秀消息队列的中间件,在互联网领域久经考验,也被各个行业广泛应用。
在上一篇文章中,介绍了RocketMQ的工作原理使用。物联网中如何使用RockeMQ
如何在配置文件端配置消费者线程大小
当生产者将大量的消息堆积到消息队列中时,需要同步启用消费者去消费队列中的消息,达到动态平衡的效果。
如下代码所示,在消费者类上,会使用@RocketMQMessageListener注解,并填写必要的属性:consumerThreadNumber:消费线程、主题、消费组。
@RocketMQMessageListener(// 指定消费线程大小consumeThreadNumber = 16,topic = TOPIC_DEMO,consumerGroup = "consumer_demo_group")
@Component
public class Consumer implements RocketMQListener<MessageExt> {private final String CHARSET = Charset.defaultCharset().name();@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String str = new String(body, Charset.forName(this.CHARSET));System.out.println("消费者消费的消息为: " + str);}
}
其中的topic、consumerGroup可以指定一次就不会变啦,但是consumerThreadNumber会根据机器的性能发生变化;因此需要将其提出放到配置文件中,方便修改,比如"application.yaml"。
那应该如何实现呢?
其中 consumerThreadNumber = 16,表明填入的是一个static的变量,因此如果简单地使用@Value来进行注入变量是不成功的,因为它只能注入非静态变量。为了能实现从配置文件中读取变量,并转为static变量,采用了显示set方式赋值变量的方法。
/**
* 注入mq消费的线程数量
*/
public static int RocketMQThreadSize;@Value("${biz.RocketMQThreadSize}")
public void setRocketMQThreadSize(int threadSize) {
RocketMQThreadSize = threadSize;
}
那在配置文件中就可以配置RocketMQThreadSize的大小啦。如下在application.yaml就可以搞定。
biz: RocketMQThreadSize: 200
如何使得自定义的线程大小生效
如上一章所示,可以得到静态的变量,那如何在消费者中生效呢?幸好RocketMQ提供一个接口可以实现消费者线程的自定义。
在消费者的类需要实现RocketMQPushConsumerLifecycleListener接口即可,然后在类中实现prepareStart方法即可。具体如下所示:
@RocketMQMessageListener(topic = TOPIC_DEMO,consumerGroup = "consumer_demo_group")
@Component
public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {private final String CHARSET = Charset.defaultCharset().name();@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String str = new String(body, Charset.forName(this.CHARSET));System.out.println("消费者消费的消息为: " + str);}@Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {// 指定消费线程大小defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);}
}
在prepareStart方法中,指定一些必要的线程参数
- 最大线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize); - 最小线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
并且通过实验和查看源码,其中最大、最小设置一样才会生效。
如何确定合适的线程大小
以上的步骤已经帮忙把线程设置提取出来啦,之后只需在配置文件中修改线程数大小,而不需去代码中修改,避免代码导致系统出现问题。那如何去确定线程的数量大小呢?
线程是计算机执行任务的基本的单位,即消费任务可以交给线程去执行。
当线程数量较少时,CPU性能不能充分发挥。但是线程数量过的就会导致过多的线程处于等待中,机器的负载升高。因此需要确定适合当前机器的线程数量。
在RocketMQ线程调优有两个指标可以帮助大致确定消费线程大小:
- 消费者的
TPS,表明消费者的能力; - 机器负载,分为
CPU负载和IO负载,和自身的核心数量有关。
RocketMQ提供web界面,可以监测TPS的大小,这个数量当然是越大越好,但是也要考虑负载。

在服务器输入top命令就可以看大,当前机器的负载:

分别为1、5、15分钟负载。
在进行压测的时候,需要知道机器的核心数量,监测负载的时候负载的大小就不能超过核心数量。
在测试的时候可以从小到大调节线程数大小,并且关注TPS和负载。
结尾
以上就是确定消费者线程大小的整个过程,有疑问欢迎留言交流!!!
线程介绍
相关文章:
如何确定RocketMQ中消费者的线程大小
背景 随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求…...
OpenAPI SDK组件之Spring Aop源码拓展
Spring Aop 看这个分享的应该都用过Spring Aop,这里就不再过多介绍了它是什么了。 我抽取了Spring Aop的部分源码,通过它实现请求参数可变拦截,同时apisdk离开Spring框架,仍然可以正常运行。 讲拦截也好,通知也罢&a…...
蓝桥杯C/C++VIP试题每日一练之龟兔赛跑预测
💛作者主页:静Yu 🧡简介:CSDN全栈优质创作者、华为云享专家、阿里云社区博客专家,前端知识交流社区创建者 💛社区地址:前端知识交流社区 🧡博主的个人博客:静Yu的个人博客…...
为你的Vue2.x老项目安装Vite发动机吧
天下苦webpack久矣,相信作为前端开发者一定经历过在项目迭代时间较长的时候经历漫长等待的这一过程,每一次保存都会浪费掉大量时间,这是webpack这种机制所带来的问题。 于是,尤大为我们带来了新一代前端构建工具:vite…...
ZCMU--5012: 铺设道路(差分思路)
Description 春春是一名道路工程师,负责铺设一条长度为 n 的道路。 铺设道路的主要工作是填平下陷的地表。 整段道路可以看作是 n 块首尾相连的区域,一开始,第 i 块区域下陷的深度为 di。 春春每天可以选择一段连续区间 [L,R]&…...
算法模板总结(自用)
算法模板总结滑动窗口双指针算法数组相关合并两个有序数组左右指针技巧快慢指针技巧字符串相关左右指针反转字符串问题快慢指针替换空格字符问题链表相关快慢双指针删除链表的倒数第N个节点链表相交环形链表链表操作几数之和两数之和四个数组的四数之和三数之和同一数组中四数之…...
【架构师】零基础到精通——架构发展
博客昵称:架构师Cool 最喜欢的座右铭:一以贯之的努力,不得懈怠的人生。 作者简介:一名Coder,软件设计师/鸿蒙高级工程师认证,在备战高级架构师/系统分析师,欢迎关注小弟! 博主小留言…...
C++(20):三路比较运算符
C20增加了三路比较运算符<>(戏称航天飞机运算符),用于对类的比较运算符进行统一的设计。有两种使用方式:默认比较对于某些类,如果按照其成员逐一比较即可决定比较运算符的值,那么可以使用默认的三路运…...
MySQL workbench 字符集、字符序的概念与联系
在数据的存储上,MySQL提供了不同的字符集支持。而在数据的对比操作上,则提供了不同的字符序支持。 MySQL提供了不同级别的设置,包括server级、database级、table级、column级,可以提供非常精准的设置。 什么是字符集、字符序&am…...
DBA之路---数据库启动与关闭过程
DBA之路—数据库启动与关闭过程 1、启动过程 oracle启动的四个状态 shutdown、就是数据库关闭状态。 nomount模式 #启动instance ,读取参数文件、分配sga空间启动后台进程,打开alter日志和其他trace文件startup nomount #该模式下只会创建实例并不加…...
Shell文件包含
和其他语言一样,Shell 也可以包含外部脚本。这样可以很方便的封装一些公用的代码作为一个独立的文件。 一、语法格式 Shell 文件包含的语法格式如下: . filename # 注意点号(.)和文件名中间有一空格 或 source filename 在当前bash环境下读取并执行file…...
计算机网络(六): HTTP,HTTPS,DNS,网页解析全过程
文章目录一、HTTP头部包含的信息通用头部请求头部响应头部实体头部二、Keep-Alive和非Keep-Alive的区别三、HTTP的方法四、HTTP和HTTPS建立连接的过程4.1 HTTP4.2 HTTPS五、HTTP和HTTPS的区别六、HTTPS的加密方式七、cookie和sessionsessioncookie八、HTTP状态码状态码200&…...
Android仿京东金融的数值滚动尺功能
自定义数值滚动尺,这个用的还是挺多的,例如京东金融的通过滚动尺选择金额等,而这次就是高仿京东金融的数值滚动尺。首先看看下效果图,如下:首先先给你们各个变量的含义,以免在后面的讲解中不知变量的意思,代码如下://最…...
Nginx 和 Tomcat 实现负载均衡
Nginx 和 tomcat 实现负载均衡 🏆荣誉认证:51CTO博客专家博主、TOP红人、明日之星;阿里云开发者社区专家博主、技术博主、星级博主。 💻微信公众号:微笑的段嘉许 📌本文由微笑的段嘉许原创! &am…...
【万能排序之qsort、b_sort 、s_sort】
文章目录前言:star:qsort函数函数参数qsort函数的使用:star:模拟实现万冒泡排序函数参数模拟实现b_sort注意点:star:模拟实现万能选择排序函数参数模拟实现s_sort最后前言 我们所熟悉的冒泡排序,选择排序,插入排序,二分排序等都是基于给定的一…...
利用InceptionV3实现图像分类
最近在做一个机审的项目,初步希望实现图像的四分类,即:正常(neutral)、涉政(political)、涉黄(porn)、涉恐(terrorism)。有朋友给推荐了个github上…...
【Java】CAS锁
一、什么是CAS机制(compare and swap) 1.概述 CAS的全称为Compare-And-Swap,直译就是对比交换。是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值。经过调查发现,…...
Linux服务器配置系统安全加固方法
1. SSH空闲超时时间建议为: 600-900 解决方案: 在【/etc/ssh/sshd_config】文件中设置【ClientAliveInterval】设置为600到900之间 vim /etc/ssh/sshd_config #将 ClientAliveInterval 参数值设置为 900 2. 修改检查SSH密码修改最小间隔 解决方案: 在【/etc/login.defs】文件…...
Codeforces Round #850 (Div. 2, based on VK Cup 2022 - Final Round)(A~E)
t宝酱紫喜欢出这种分类讨论的题?!A1. Non-alternating Deck (easy version)给出n张牌,按照题目给的顺序分给两人,问最后两人手中各有几张牌。思路:模拟。AC Code:#include <bits/stdc.h>typedef long…...
qt源码--信号槽
本篇主要从Qt信号槽的连接、断开、调用、对象释放等方面展开; 1.信号建立连接过程 connect有多个重载函数,主要是为了方便使用者,比较常用的有2种方式: a. QObject::connect(&timer, &QTimer::timeout, &loop, &am…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建
制造业采购供应链管理是企业运营的核心环节,供应链协同管理在供应链上下游企业之间建立紧密的合作关系,通过信息共享、资源整合、业务协同等方式,实现供应链的全面管理和优化,提高供应链的效率和透明度,降低供应链的成…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍
文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结: 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析: 实际业务去理解体会统一注…...
unix/linux,sudo,其发展历程详细时间线、由来、历史背景
sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...
JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作
一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
如何在网页里填写 PDF 表格?
有时候,你可能希望用户能在你的网站上填写 PDF 表单。然而,这件事并不简单,因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件,但原生并不支持编辑或填写它们。更糟的是,如果你想收集表单数据ÿ…...
