RocketMQ批量发送消息❓
优点:
批量发送消息可以提高rocketmq的生产者性能和吞吐量。
使用场景:
- 发送大量小型消息时;
- 需要降低消息发送延迟时;
- 需要提高生产者性能时;
注意事项:
- 消息列表的大小不能超过broker设置的最大消息大小;
- 消息列表的大小不能超过生产证设置的maxMessageSize 参数,此参数默认为 4MB;
- 批量发送消息不支持消息事务;
- 如果代码在发送消息列表时发生异常,则可能会发生部分消息发送成功,部分消息发送失败的情况。如果要确保所有消息都已成功发送,则需要增加错误处理逻辑和消息重试机制;
批量发送消息为什么要限制maxMessageSize❓
消息列表的大小不能超过生产者设置的maxMessageSize参数,主要是为了避免消息发送延迟和消息过大导致broker出现性能问题。如果尝试发送大于maxMessageSize的消息,RocketMQ会抛出MessageTooLargeException异常,并且消息不会被发送到broker。
如果开发者在开发时遇到了消息列表大小超过maxMessageSize的情况,可以考虑以下几种处理方式:
-
- 提升maxMessageSize参数的大小,这样可以容纳更大的消息列表。但是,需要注意在提升参数大小时,要考虑到RocketMQ broker的性能和网络带宽等因素。
- 考虑将消息列表进行拆分,然后分批发送。这样可以避免一次发送过多的消息。
- 计算消息的大小并进行压缩。可以使用一些压缩算法,如 LZ4、Snappy 等,对消息进行压缩,以减小消息的大小。
- 对超过 maxMessageSize 的消息进行过滤或其他处理。可以通过业务逻辑对消息进行分组或分类,对超过 maxMessageSize 的消息进行过滤或其他处理,以避免发送超出限制的消息。
代码实现
package com.resource.sync.rocketmq;import java.util.Iterator;
import java.util.List;/*** @description:消息分割,在rocketmq中,一次性发送消息的长度不可超过4mb,此时我们需要进行切割,确保消息长度小于4mb**/
public class ListSplitter<T> implements Iterator<List<T>> {/*** 分割数据大小*/private int sizeLimit;/*** 分割数据列表*/private final List<T> messages;/*** 分割索引*/private int currIndex;public ListSplitter(int sizeLimit, List<T> messages) {this.sizeLimit = sizeLimit;this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<T> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {T t = messages.get(nextIndex);totalSize = totalSize + t.toString().length();if (totalSize > sizeLimit) {break;}}List<T> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}
private final int maxMessageSize = 1024 * 1024 * 4;/*** 消息分割(批量发送)*/private void bulkSendMsg(List<Message<String>> messageList) {// 限制数据大小ListSplitter splitter = new ListSplitter(maxMessageSize, messageList);while (splitter.hasNext()) {List<Message> nextList = splitter.next();syncBulkSendMessage("topic", nextList);}}/*** @param topic* @param list* @description:发送实时消息(批量)*/public void syncBulkSendMessage(String topic, List<Message> list) {SendResult sendResult = null;try {sendResult = rocketMQTemplate.syncSend(topic, list);if (sendResult.getSendStatus() != SendStatus.SEND_OK) {log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR.RESULT_STATUS:{},MSG_ID:{}", sendResult.getSendStatus(), sendResult.getMsgId());}if (sendResult.getSendStatus() == SendStatus.SEND_OK) {log.info("BULK_SEND_MSG_SUCCESS.MSG_ID:{}", sendResult.getMsgId());}} catch (Exception e) {log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR:{}", e);}}
相关文章:
RocketMQ批量发送消息❓
优点: 批量发送消息可以提高rocketmq的生产者性能和吞吐量。 使用场景: 发送大量小型消息时;需要降低消息发送延迟时;需要提高生产者性能时; 注意事项: 消息列表的大小不能超过broker设置的最大消息大小;消息列表…...
一键同步chromedriver版本
ChromeDriver是一个控制Chrome浏览器的驱动程序,它和Selenium一起被广泛用于Web自动化测试。然而,随着Chrome版本的升级,我们需要不断更新ChromeDriver以保持其与Chrome的兼容性。这个过程既费时又繁琐,而且对于非技术人员来说可能…...
Zephyr-7B-β :类GPT的高速推理LLM
Zephyr 是一系列语言模型,经过训练可以充当有用的助手。 Zephyr-7B-β 是该系列中的第二个模型,是 Mistralai/Mistral-7B-v0.1 的微调版本,使用直接偏好优化 (DPO) 在公开可用的合成数据集上进行训练 。 我们发现,删除这些数据集的…...
【笔试题】位运算
记录一些常见的位运算题: 1、实现对一个8bit数据(unsigned char类型)的指定位(例如第n位)置0或者置1操作,并保持其他地位不变。 unsigned char reg;/* 对第n位置0 */ reg &~ (1 << n);/* 对第n位…...
RT-Thread 10. 使用keil4编译GD32F450
1. 修改keil路径 2.增加MCU型号宏定义 3. 在ENV界面输入 scons -c scons --targetmdk44. 编译 scons --verbose提示错误 Warning: L6310W: Unable to find ARM libraries. Error: L6411E: No compatible library exists with a definition of startup symbol __main. Finish…...
Vue 跨域的两种解决方式
一、通过 proxy 解决跨域 1.1 baseURL 配置 对 axios 二次封装时,baseURL 设置为 /api。 const serviceAxios axios.create({baseURL: /api,timeout: 10000, // 请求超时设置withCredentials: false, // 跨域请求是否需要携带 cookie });1.2 vue.config.js 配置…...
【windows Docker 安装mysql:只需3条命令】
如下 docker pull mysql docker run --name mysql -p 3306:3306 -v D:/dockerFile/mysql/data:/var/lib/mysql/ -v D:/dockerFile/mysql/conf/my.cnf:/etc/mysql/my.cnf -e MYSQL_ROOT_PASSWORDroot -d mysql:latest --default-authentication-pluginmysql_native_password do…...
【软件逆向】如何逆向Unity3D+il2cpp开发的安卓app【IDA Pro+il2CppDumper+DnSpy+AndroidKiller】
教程背景 课程作业要求使用反编译技术,在游戏中实现无碰撞。正常情况下碰撞后角色死亡,修改为直接穿过物体不死亡。 需要准备的软件 il2CppDumper。DnSpy。IDA Pro。AndroidKiller。 一、使用il2CppDumper导出程序集 将{my_game}.apk后缀修改为{my_…...
vue3ref和reactive
Vue 3中的ref和reactive是两个重要的响应式API。 ref用于将基本数据类型转换为响应式数据,它返回一个包含value属性的响应式对象。ref适合用于单个值的响应式需求,例如计数器、表单数据等。示例代码: <template><div><p>…...
[架构之路-244]:目标系统 - 设计方法 - 软件工程 - 软件开发方法与软件开发模型
目录 一、软件开发方法:组织、管理、复用软件代码的方法 1.1 概述: 软件聚合的程度由简单到复杂 1.2 结构化的开发方法 1.3 面对对象的开发方法 1.4 面向组件的开发方法 1.5 面向服务的开发方法 1.6 不同开发方法比较:结构化、面对对象、面向组件…...
Matter 系列 #10|Matter 的证书吊销机制
乐鑫 Matter 系列文章 #10 在之前的多篇博客文章中,我们从不同方面介绍了 Matter,其中包括 Matter 的安全模型。简单回顾一下,Matter 的安全模型基于 PKI(即公钥基础设施)机制,可用于建立和管理数字证书、加…...
mybatis动态表名
1.基于mybatis官方文档 Configuration public class MybatisPlusConfig {Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor() {MybatisPlusInterceptor interceptor new MybatisPlusInterceptor();DynamicTableNameInnerInterceptor dynamicTableNameInnerIntercep…...
高校为什么需要大数据挖掘平台?
目前数据挖掘已经成为各种应用领域的重要技术,大学数据挖掘课程的开放已经出现。数据挖掘课程整合了多门学科知识。该课程包括各种理论知识,也离不开相关的实用技术。整个教学过程是培养和提高学生全面创新和解决问题的能力。过去,教学过程理…...
@Value的使用
在spring boot项目中,Value只能获取非静态变量,否则是null /*** cron"0 */1 * * * ?"*/ Value("${system.cron}") private String cron;/*** cron1null*/ Value("${system.cron}") private static String cron1;静态块获…...
用 Wireshark 在 Firefox 或 Google Chrome 上使用 SSLKEYLOGFILE 环境变量解密 SSL 流量
原文:这 您希望使用 SSL 会话密钥解密和检查 SSL 应用程序数据。 您希望在客户端系统上记录 SSL 会话密钥。 您正在客户端系统上使用 Firefox 或 Google Chrome 浏览器来访问 Web 应用程序。 注意:您还可以在客户端系统上使用 Microsoft Edge ÿ…...
京东大数据:2023年Q3美妆行业数据分析报告
近日,珀莱雅发布三季报,今年前三季度,公司实现营收52.49亿元,同比增长32.47%。分季度看,“618大促”所在Q2业绩增长最为亮眼,营收同比增速达到46.22%,进入Q3,在电商大促缺席情况下&a…...
[题] 改革春风吹满地 #图论 #多边形面积
题目 HDU 2036 改革春风吹满地 题解 参考博客:HDU 2036 改革春风吹满地 代码 #include<bits/stdc.h> using namespace std; const int N 110; //叉乘计算面积的公式,以(0,0)为起始点划分 int main() {int n;while(~scanf("%d", &…...
FPGA时序分析与约束(2)——时序电路时序
一、前言 在之前的内容中,我们介绍了组合电路的时序问题和可能导致的毛刺,强烈推荐在阅读前文的基础上再继续阅读本文, 前文链接:FPGA时序分析与约束(1)——组合电路时序 这篇文章中,我们将继续…...
明御安全网关任意文件上传漏洞复现
简介 安恒信息明御安全网关(NGFW) 秉持安全可视、简单有效的理念,以资产为视角的全流程防御的下一代安全防护体系,并融合传统防火墙、入侵防御系统、防病毒网关、上网行为管控、VPN网关、威胁情报等安全模块于一体的智慧化安全网关。 较低版本的系统存…...
JVM虚拟机:如何查看自己的JVM默认的垃圾回收器
只需要在程序运行的时候指定下面的参数就可以看到当前自己的JVM默认的垃圾回收器是什么?如下所示: 如上所示,默认使用的是G1回收器,这是我的电脑,因为我的电脑安装jdk的版本是1.9 如果你的jdk的版本是1.8,那…...
智慧医疗能源事业线深度画像分析(上)
引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...
基于当前项目通过npm包形式暴露公共组件
1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹,并新增内容 3.创建package文件夹...
【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...
基于TurtleBot3在Gazebo地图实现机器人远程控制
1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...
GitFlow 工作模式(详解)
今天再学项目的过程中遇到使用gitflow模式管理代码,因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存,无论是github还是gittee,都是一种基于git去保存代码的形式,这样保存代码…...
【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...
Caliper 负载(Workload)详细解析
Caliper 负载(Workload)详细解析 负载(Workload)是 Caliper 性能测试的核心部分,它定义了测试期间要执行的具体合约调用行为和交易模式。下面我将全面深入地讲解负载的各个方面。 一、负载模块基本结构 一个典型的负载模块(如 workload.js)包含以下基本结构: use strict;/…...
C语言中提供的第三方库之哈希表实现
一. 简介 前面一篇文章简单学习了C语言中第三方库(uthash库)提供对哈希表的操作,文章如下: C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...
给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...
WEB3全栈开发——面试专业技能点P7前端与链上集成
一、Next.js技术栈 ✅ 概念介绍 Next.js 是一个基于 React 的 服务端渲染(SSR)与静态网站生成(SSG) 框架,由 Vercel 开发。它简化了构建生产级 React 应用的过程,并内置了很多特性: ✅ 文件系…...
