当前位置: 首页 > news >正文

RocketMQ如何安全的批量发送消息❓

优点:

批量发送消息可以提高rocketmq的生产者性能和吞吐量。

使用场景:

  1. 发送大量小型消息时;
  2. 需要降低消息发送延迟时;
  3. 需要提高生产者性能时;

注意事项:

  1. 消息列表的大小不能超过broker设置的最大消息大小;
  2. 消息列表的大小不能超过生产证设置的maxMessageSize 参数,此参数默认为 4MB;
  3. 批量发送消息不支持消息事务;
  4. 如果代码在发送消息列表时发生异常,则可能会发生部分消息发送成功,部分消息发送失败的情况。如果要确保所有消息都已成功发送,则需要增加错误处理逻辑和消息重试机制;


批量发送消息为什么要限制maxMessageSize❓

消息列表的大小不能超过生产者设置的maxMessageSize参数,主要是为了避免消息发送延迟和消息过大导致broker出现性能问题。如果尝试发送大于maxMessageSize的消息,RocketMQ会抛出MessageTooLargeException异常,并且消息不会被发送到broker。

如果开发者在开发时遇到了消息列表大小超过maxMessageSize的情况,可以考虑以下几种处理方式:

    1. 提升maxMessageSize参数的大小,这样可以容纳更大的消息列表。但是,需要注意在提升参数大小时,要考虑到RocketMQ broker的性能和网络带宽等因素。
    2. 考虑将消息列表进行拆分,然后分批发送。这样可以避免一次发送过多的消息。
    3. 计算消息的大小并进行压缩。可以使用一些压缩算法,如 LZ4、Snappy 等,对消息进行压缩,以减小消息的大小。
    4. 对超过 maxMessageSize 的消息进行过滤或其他处理。可以通过业务逻辑对消息进行分组或分类,对超过 maxMessageSize 的消息进行过滤或其他处理,以避免发送超出限制的消息。

代码实现

package com.resource.sync.rocketmq;import java.util.Iterator;
import java.util.List;/*** @description:消息分割,在rocketmq中,一次性发送消息的长度不可超过4mb,此时我们需要进行切割,确保消息长度小于4mb**/
public class ListSplitter<T> implements Iterator<List<T>> {/*** 分割数据大小*/private int sizeLimit;/*** 分割数据列表*/private final List<T> messages;/*** 分割索引*/private int currIndex;public ListSplitter(int sizeLimit, List<T> messages) {this.sizeLimit = sizeLimit;this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<T> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {T t = messages.get(nextIndex);totalSize = totalSize + t.toString().length();if (totalSize > sizeLimit) {break;}}List<T> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}
    private final int maxMessageSize = 1024 * 1024 * 4;/*** 消息分割(批量发送)*/private void bulkSendMsg(List<Message<String>> messageList) {// 限制数据大小ListSplitter splitter = new ListSplitter(maxMessageSize, messageList);while (splitter.hasNext()) {List<Message> nextList = splitter.next();syncBulkSendMessage("topic", nextList);}}/*** @param topic* @param list* @description:发送实时消息(批量)*/public void syncBulkSendMessage(String topic, List<Message> list) {SendResult sendResult = null;try {sendResult = rocketMQTemplate.syncSend(topic, list);if (sendResult.getSendStatus() != SendStatus.SEND_OK) {log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR.RESULT_STATUS:{},MSG_ID:{}", sendResult.getSendStatus(), sendResult.getMsgId());}if (sendResult.getSendStatus() == SendStatus.SEND_OK) {log.info("BULK_SEND_MSG_SUCCESS.MSG_ID:{}", sendResult.getMsgId());}} catch (Exception e) {log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR:{}", e);}}

相关文章:

RocketMQ如何安全的批量发送消息❓

优点&#xff1a; 批量发送消息可以提高rocketmq的生产者性能和吞吐量。 使用场景: 发送大量小型消息时&#xff1b;需要降低消息发送延迟时&#xff1b;需要提高生产者性能时&#xff1b; 注意事项&#xff1a; 消息列表的大小不能超过broker设置的最大消息大小;消息列表…...

计算机视觉与深度学习 | 基于视觉惯性紧耦合的SLAM后端优化算法

===================================================== github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 ===================================================== 基于视觉惯性紧耦合的SLAM后端优化算法 引言视觉惯性联合初始化非线性优…...

GDI+ 绘制透明图

目录 一、GDI+ 准备工作 1、线程中添加GDI+支持 2、Gdiplus::Bitmap 1)、从文件创建位图...

【Java】IntelliJ IDEA使用JDBC连接MySQL数据库并写入数据

目录 0 准备工作1 创建Java项目2 添加JDBC 驱动程序3 创建数据库连接配置文件4 创建一个 Java 类来连接和操作数据库5 运行应用程序 在 IntelliJ IDEA 中连接 MySQL 数据库并将数据存储在数据表中&#xff0c;使用 Java 和 JDBC&#xff08;Java Database Connectivity&#xf…...

Linux Hadoop平台伪分布式安装

Linux Hadoop 伪分布式安装 1. JDK2. Hadoop3. MysqlHive3.1 Mysql8安装3.2 Hive安装 4. Spark4.1 Maven安装4.2 Scala安装4.3 Spark编译并安装 5. Zookeeper6. HBase 版本概要&#xff1a; jdk&#xff1a; jdk-8u391-linux-x64.tar.gzhadoop&#xff1a;hadoop-3.3.1.tar.gzh…...

【STM32-DSP库的使用】基于Keil5 + STM32CubeMX 手动添加、库添加方式

STM32-DSP库的使用 一.CMSIS-DSP1.1 DSP库简介1.2 支持的函数类别1.3 宏定义 二、操作2.1 STM32CubeMX 配置基本工程2.2 Lib库的方式实现(推荐)2.3 手动添加DSP文件&#xff08;可以下载官方最新库&#xff0c;功能齐全&#xff09; 三、MFCC测试DSP加速效果 为验证语音识别MFC…...

createElement的用法

目录 一&#xff1a;介绍 二&#xff1a;语法与例子 1、语法 2、一些例子 例1&#xff1a; 例2&#xff1a; 例3&#xff1a; 3、第二种写法 一&#xff1a;介绍 document.createElement()是在对象中创建一个对象&#xff0c;要与appendChild() 或 insertBefore()方法…...

Mabitys总结

一、ORM ORM(Object/Relation Mapping)&#xff0c;中文名称&#xff1a;对象/关系 映射。是一种解决数据库发展和面向对象编程语言发展不匹配问题而出现的技术。 使用JDBC技术时&#xff0c;手动实现ORM映射&#xff1a; 使用ORM时&#xff0c;自动关系映射&#xff1a; &am…...

JAVA安全之Log4j-Jndi注入原理以及利用方式

什么是JNDI&#xff1f; JDNI&#xff08;Java Naming and Directory Interface&#xff09;是Java命名和目录接口&#xff0c;它提供了统一的访问命名和目录服务的API。 JDNI主要通过JNDI SPI&#xff08;Service Provider Interface&#xff09;规范来实现&#xff0c;该规…...

Spring源码系列-框架中的设计模式

简单工厂 实现方式&#xff1a; BeanFactory。Spring中的BeanFactory就是简单工厂模式的体现&#xff0c;根据传入一个唯一的标识来获得Bean对象&#xff0c;但是否是在传入参数后创建还是传入参数前创建这个要根据具体情况来定。 实质&#xff1a; 由一个工厂…...

数据的读取和保存-MATLAB

1 序言 在进行数据处理时&#xff0c;经常需要写代码对保存在文件中的数据进行读取→处理→保存的操作&#xff0c;流程图如下&#xff1a; 笔者每次在进行上述操作时&#xff0c;都需要百度如何“选中目标文件”以及如何“将处理好的数据保存到目标文件中”&#xff0c;对这一…...

C++ 输入、输出和整数运算

【问题描述】 编写一个程序&#xff0c;读入两个整数&#xff0c;计算并输出他们的和、积、商和余数。 【输入形式】 程序运行到输入时&#xff0c;不要显示输入提示信息。 输入为两个整数&#xff08;在问题描述中记作A和B&#xff0c;程序中请自定变量名&#xff09;,A和B使…...

Element Plus 解决组件显示英文问题

要解决Element Plus日历组件显示英文的问题&#xff0c;可以使用Element Plus提供的国际化功能&#xff0c;切换成中文语言。下面是一个简单的示例&#xff1a; 首先&#xff0c;在main.ts或者你的入口文件中引入Element Plus的中文语言包和Vue I18n&#xff1a; import { cr…...

sqlite3.NotSupportedError: deterministic=True requires SQLite 3.8.3 or higher

问题描述 sqlite3.NotSupportedError: deterministicTrue requires SQLite 3.8.3 or higher 解决方法 A kind of solution is changing the database from sqlite3 to pysqlite3. After acticate the virtualenv, install pysqlite. pip3 install pysqlite3 pip3 install …...

单线程介绍、ECMAScript介绍、操作系统Windows、Linux 和 macOS

目录 单线程介绍ECMAScript介绍操作系统Windows、Linux 和 macOS &#x1f44d; 点赞&#xff0c;你的认可是我创作的动力&#xff01; ⭐️ 收藏&#xff0c;你的青睐是我努力的方向&#xff01; ✏️ 评论&#xff0c;你的意见是我进步的财富&#xff01; 单线程介绍 单线…...

【Docker】iptables基本原理

在当今数字化时代&#xff0c;网络安全问题变得越来越重要。为了保护我们的网络免受恶意攻击和未经授权的访问&#xff0c;我们需要使用一些工具来加强网络的安全性。其中&#xff0c;iptables是一个强大而受欢迎的防火墙工具&#xff0c;它可以帮助我们控制网络流量并保护网络…...

微服务架构——笔记(3)Eureka

微服务架构——笔记&#xff08;3&#xff09; 基于分布式的微服务架构 本次笔记为 此次项目的记录&#xff0c;便于整理思路&#xff0c;仅供参考&#xff0c;笔者也将会让程序更加完善 内容包括&#xff1a;1.支付模块、2.消费者订单模块、支付微服务入驻Eureka、Eureka集群…...

网络编程套接字(2)——简单的TCP网络程序

文章目录 一.简单的TCP网络程序1.服务端创建套接字2.服务端绑定3.服务端监听4.服务端获取连接5.服务端处理请求6.客户端创建套接字7.客户端连接服务器8.客户端发起请求9.服务器测试10.单执行流服务器的弊端 二.多进程版的TCP网络程序1.捕捉SIGCHLD信号2.让孙子进程提供服务 三.…...

MySQL数据库的简单的面试题

1、MySQL有哪些锁机制 MySQL有以下几种机制&#xff1a; 行级锁&#xff1a;行极锁在mysql 中最常用的锁机制&#xff0c;它只针对表的某一行进行加锁不受影响。MySQL的行级锁分为共享锁和排他锁两种类型&#xff0c;共享锁和排它锁不能同时存在于一行。 表级锁&#xff1a;表…...

hbuilderx打包应用上传到app store构建版本的教程

简介&#xff1a; 将ipa上架app store的过程中&#xff0c;发现需要将打包的ipa文件上传到app store的构建版本里&#xff0c;但是苹果官方推荐的上传工具&#xff0c;只有xcode和transporter等工具&#xff0c;这些工具是不能安装在windows电脑的。那么有没有windows电脑的上传…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...

管理学院权限管理系统开发总结

文章目录 &#x1f393; 管理学院权限管理系统开发总结 - 现代化Web应用实践之路&#x1f4dd; 项目概述&#x1f3d7;️ 技术架构设计后端技术栈前端技术栈 &#x1f4a1; 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 &#x1f5c4;️ 数据库设…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;已成为技术领域的焦点。从智能写作到代码生成&#xff0c;LLM 的应用场景不断扩展&#xff0c;深刻改变了我们的工作和生活方式。然而&#xff0c;理解这些模型的内部…...

毫米波雷达基础理论(3D+4D)

3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文&#xff1a; 一文入门汽车毫米波雷达基本原理 &#xff1a;https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...

Linux系统部署KES

1、安装准备 1.版本说明V008R006C009B0014 V008&#xff1a;是version产品的大版本。 R006&#xff1a;是release产品特性版本。 C009&#xff1a;是通用版 B0014&#xff1a;是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存&#xff1a;1GB 以上 硬盘&#xf…...

深入理解Optional:处理空指针异常

1. 使用Optional处理可能为空的集合 在Java开发中&#xff0c;集合判空是一个常见但容易出错的场景。传统方式虽然可行&#xff0c;但存在一些潜在问题&#xff1a; // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...