当前位置: 首页 > news >正文

010 rocketmq批量消息

文章目录

  • 批量消息
  • BatchProducer.java
  • BatchConsumer.java

批量消息

批量发送可以提⾼发送性能,但有⼀定的限制:
topic 相同
waitStoreMsgOK 相同 (⾸先我们建设消息的iswaitstoremsgok=true(默认为true), 如果没有异常,我们将始终收到"OK",org.apache.rocketmq.common.message.Message#isWaitStoreMsgOK)
不支持延时发送
⼀批消息的大小不能⼤于 4M(DefaultMQProducer.maxMessageSize)
大小限制需要特殊注意,因为消息是动态的,不注意的话就可能超限,就会报错:
计算消息的大小
= (topic + body + (key + value) * N) * 吞吐量

int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();
}

BatchProducer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;public class BatchProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String topic = "TopicTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));//then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {try {List<Message>  listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();//handle the error}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);//计算消息的大小 = (topic + body + (key + value) * N) * 吞吐量int tmpSize = message.getTopic().length() + message.getBody().length;//属性值的添加Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {//+key + valuetmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; //for log overheadif (tmpSize > SIZE_LIMIT) {//it is unexpected that single message exceeds the SIZE_LIMIT//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex == 0) {//if the next sublist has no element, add this one and then break, otherwise just breaknextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}

BatchConsumer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class BatchConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");// Subscribe one more more topics to consume.consumer.subscribe("TopicTest", "*");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}

相关文章:

010 rocketmq批量消息

文章目录 批量消息BatchProducer.javaBatchConsumer.java 批量消息 批量发送可以提⾼发送性能&#xff0c;但有⼀定的限制&#xff1a; topic 相同 waitStoreMsgOK 相同 &#xff08;⾸先我们建设消息的iswaitstoremsgoktrue(默认为true), 如果没有异常,我们将始终收到"O…...

JavaWeb后端基础(3)

原打算把Mysql操作数据库的一些知识写进去&#xff0c;但是感觉没必要&#xff0c;要是现在会的都是简单的增删改查&#xff0c;所以&#xff0c;这一篇&#xff0c;我直接从java操作数据库开始写&#xff0c;所以这一篇大致就是记一下JDBC、MyBatis、以及SpringBoot的配置文件…...

Oracle数据库基础入门(三): DQL 深入解析与实践

在 Oracle 数据库的知识体系中&#xff0c;数据查询语言&#xff08;DQL&#xff09;无疑是最为常用且关键的部分之一。对于 Java 全栈开发者而言&#xff0c;熟练掌握 DQL 不仅能高效地从数据库中获取所需数据&#xff0c;更是构建强大后端应用的基石。通过 DQL&#xff0c;我…...

P9231 [蓝桥杯 2023 省 A] 平方差

P9231 [蓝桥杯 2023 省 A] 平方差 - 洛谷 题目描述 给定 L,R&#xff0c;问 L≤x≤R 中有多少个数 x 满足存在整数 y,z 使得 xy2−z2。 输入格式 输入一行包含两个整数 L,R&#xff0c;用一个空格分隔。 输出格式 输出一行包含一个整数满足题目给定条件的 x 的数量。 输…...

贪心算法 求解思路

贪心算法简介 贪心算法是通过做一系列的选择来给出某一问题的最优解。对算法中的每一个决策点&#xff0c;做一个当时&#xff08;看起来是&#xff09;最佳的选择。这种启发式策略并不是总能产生出最优解&#xff0c;但它常常能给出最优解。 在实际设计贪心算法时&#xff0…...

2025/2/25,字节跳动后端开发一面面经

一、双方简单自我介绍 面试官先自我介绍,之后属于面试官看简历过程,基本不听。 二、实习中遇到最难的事情,怎么解决的 主要问的还是实习中做过的项目,项目难点在哪里(自己参与的地方),面对困难是怎么思考,怎么实际操作解决的。 三、项目实现细节 掌握自己项目的实…...

Buildroot 添加自定义模块-内置文件到文件系统

目录 概述实现步骤1. 创建包目录和文件结构2. 配置 Config.in3. 定义 cp_bin_files.mk4. 添加源文件install.shmy.conf 5. 配置与编译 概述 Buildroot 是一个高度可定制和模块化的嵌入式 Linux 构建系统&#xff0c;适用于从简单到复杂的各种嵌入式项目. buildroot的源码中bui…...

SpringBoot新闻推荐系统设计与实现

随着信息时代的快速发展&#xff0c;新闻推荐系统成为用户获取个性化内容的重要工具。本文将介绍一个幽络源的基于SpringBoot开发的新闻推荐系统&#xff0c;该系统功能全面&#xff0c;操作简便&#xff0c;能够满足管理员和用户的多种需求。 管理员模块 管理员模块为系统管…...

领域驱动设计:事件溯源架构简介

概述 事件溯源架构通常由3种应用设计模式组成,分别是:事件驱动(Event Driven),事件溯源(Event Source)、CQRS(读写分离)。这三种应用设计模式常见于领域驱动设计(DDD)中,但它们本身是一种应用设计的思想,不仅仅局限于DDD,每一种模式都可以单独拿出来使用。 E…...

基于Java+Spring+Mybsita+mysql的汽租车辆共享平台的设计源码+设计文档

文末获取源码数据库文档 感兴趣的可以先收藏&#xff0c;有毕设问题&#xff0c;项目以及论文撰写等问题都可以和博主沟通&#xff0c;尽最大努力帮助更多的人&#xff01; 目录 1软件需求 1.1引言 1.1.1编写目的 1.1.2背景 1.2 绪论 1.2.1&#xff0d;Internet与…...

深度学习的正则化深入探讨

文章目录 一、说明二、学习目标三、什么是机器学习中的正则化四、了解过拟合和欠拟合五、代价函数的意义六、什么是偏差和方差&#xff1f;七、机器学习中的正则化&#xff1f; 一、说明 在训练机器学习模型时&#xff0c;模型很容易过拟合或欠拟合。为了避免这种情况&#xf…...

Token相关设计

文章目录 1. 双Token 机制概述1.1 访问令牌&#xff08;Access Token&#xff09;1.2 刷新令牌&#xff08;Refresh Token&#xff09; 2. 双Token 认证流程3. Spring Boot 具体实现3.1 生成 Token&#xff08;使用 JWT&#xff09;3.2 解析 Token3.3 登录接口&#xff08;返回…...

【时序预测】在线学习:算法选择(从线性模型到深度学习解析)

——如何为动态时序预测匹配最佳增量学习策略&#xff1f; 引言&#xff1a;在线学习的核心价值与挑战 在动态时序预测场景中&#xff08;如实时交通预测、能源消耗监控&#xff09;&#xff0c;数据以流式&#xff08;Streaming&#xff09;形式持续生成&#xff0c;且潜在的…...

React antd的datePicker自定义,封装成组件

一、antd的datePicker自定义 需求&#xff1a;用户需要为日期选择器的每个日期单元格添加一个Tooltip&#xff0c;当鼠标悬停时显示日期、可兑换流量余额和本公会可兑流量。这些数据需要从接口获取。我需要结合之前的代码&#xff0c;确保Tooltip正确显示&#xff0c;并且数据…...

学生管理前端

文章目录 首页student.html查询功能 首页 SpringBoot前端html页面放在static文件夹下&#xff1a;/src/main/resources/static 默认首页为index.html&#xff0c;我们可以用两个超链接或者两个button跳转到对应的页面。这里只是单纯的跳转页面&#xff0c;不需要提交表单等其…...

深入理解并实现自定义 unordered_map 和 unordered_set

亲爱的读者朋友们&#x1f603;&#xff0c;此文开启知识盛宴与思想碰撞&#x1f389;。 快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 在 C 的标准模板库&#xff08;STL&#xff09;中&#xff0c;unorder…...

顶顶通呼叫中心中间件(mod_cti基于FreeSWITCH)-大模型电话机器人

语音流直接对接Realtime API 多模态大模型 直接把音频流输出给大模型&#xff0c;大模型返回音频流。 顶顶通CTI对Realtime API 的支持 提供了以下2个APP可对接任意 •cti_audio_stream 通过TCP推流和播放流&#xff0c;适合用于人机对话场景。 •cti_unicast_start 通过旁…...

kinova机械臂绿色灯一闪一闪及刷机方法

一、背景 实验室有两个kinova mico机械臂&#xff0c;但经常出现操纵杆上的绿色灯一闪一闪的&#xff0c;导致无法使用操纵杆或ROS进行控制&#xff0c;下面给出官方的教程以及所需要的FS 0CPP 0008_6.2.5_mico_6dof.hex文件。 重要的东西写在前面&#xff1a; a、如果出现操…...

第16天:C++多线程完全指南 - 从基础到现代并发编程

第16天&#xff1a;C多线程完全指南 - 从基础到现代并发编程 一、多线程基础概念 1. 线程创建与管理&#xff08;C11&#xff09; #include <iostream> #include <thread>void hello() {std::cout << "Hello from thread " << std::this_…...

中科大计算机网络原理 1.5 Internt结构和ISP

一、互联网的层次化架构 ‌覆盖范围分层‌ ‌主干网&#xff08;Tier-1级&#xff09;‌ 国家级或行业级核心网络&#xff0c;承担跨区域数据传输和全球互联功能。例如中国的四大主干网&#xff08;ChinaNET、CERNET等&#xff09;以及跨国运营商&#xff08;如AT&T、Deuts…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

【磁盘】每天掌握一个Linux命令 - iostat

目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat&#xff08;I/O Statistics&#xff09;是Linux系统下用于监视系统输入输出设备和CPU使…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

Python爬虫(二):爬虫完整流程

爬虫完整流程详解&#xff08;7大核心步骤实战技巧&#xff09; 一、爬虫完整工作流程 以下是爬虫开发的完整流程&#xff0c;我将结合具体技术点和实战经验展开说明&#xff1a; 1. 目标分析与前期准备 网站技术分析&#xff1a; 使用浏览器开发者工具&#xff08;F12&…...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

Java 二维码

Java 二维码 **技术&#xff1a;**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...

Angular微前端架构:Module Federation + ngx-build-plus (Webpack)

以下是一个完整的 Angular 微前端示例&#xff0c;其中使用的是 Module Federation 和 npx-build-plus 实现了主应用&#xff08;Shell&#xff09;与子应用&#xff08;Remote&#xff09;的集成。 &#x1f6e0;️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...

华硕a豆14 Air香氛版,美学与科技的馨香融合

在快节奏的现代生活中&#xff0c;我们渴望一个能激发创想、愉悦感官的工作与生活伙伴&#xff0c;它不仅是冰冷的科技工具&#xff0c;更能触动我们内心深处的细腻情感。正是在这样的期许下&#xff0c;华硕a豆14 Air香氛版翩然而至&#xff0c;它以一种前所未有的方式&#x…...

Redis:现代应用开发的高效内存数据存储利器

一、Redis的起源与发展 Redis最初由意大利程序员Salvatore Sanfilippo在2009年开发&#xff0c;其初衷是为了满足他自己的一个项目需求&#xff0c;即需要一个高性能的键值存储系统来解决传统数据库在高并发场景下的性能瓶颈。随着项目的开源&#xff0c;Redis凭借其简单易用、…...

免费数学几何作图web平台

光锐软件免费数学工具&#xff0c;maths,数学制图&#xff0c;数学作图&#xff0c;几何作图&#xff0c;几何&#xff0c;AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...