当前位置: 首页 > news >正文

010 rocketmq批量消息

文章目录

  • 批量消息
  • BatchProducer.java
  • BatchConsumer.java

批量消息

批量发送可以提⾼发送性能,但有⼀定的限制:
topic 相同
waitStoreMsgOK 相同 (⾸先我们建设消息的iswaitstoremsgok=true(默认为true), 如果没有异常,我们将始终收到"OK",org.apache.rocketmq.common.message.Message#isWaitStoreMsgOK)
不支持延时发送
⼀批消息的大小不能⼤于 4M(DefaultMQProducer.maxMessageSize)
大小限制需要特殊注意,因为消息是动态的,不注意的话就可能超限,就会报错:
计算消息的大小
= (topic + body + (key + value) * N) * 吞吐量

int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();
}

BatchProducer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;public class BatchProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String topic = "TopicTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));//then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {try {List<Message>  listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();//handle the error}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);//计算消息的大小 = (topic + body + (key + value) * N) * 吞吐量int tmpSize = message.getTopic().length() + message.getBody().length;//属性值的添加Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {//+key + valuetmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; //for log overheadif (tmpSize > SIZE_LIMIT) {//it is unexpected that single message exceeds the SIZE_LIMIT//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex == 0) {//if the next sublist has no element, add this one and then break, otherwise just breaknextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}

BatchConsumer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class BatchConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");// Subscribe one more more topics to consume.consumer.subscribe("TopicTest", "*");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}

相关文章:

010 rocketmq批量消息

文章目录 批量消息BatchProducer.javaBatchConsumer.java 批量消息 批量发送可以提⾼发送性能&#xff0c;但有⼀定的限制&#xff1a; topic 相同 waitStoreMsgOK 相同 &#xff08;⾸先我们建设消息的iswaitstoremsgoktrue(默认为true), 如果没有异常,我们将始终收到"O…...

JavaWeb后端基础(3)

原打算把Mysql操作数据库的一些知识写进去&#xff0c;但是感觉没必要&#xff0c;要是现在会的都是简单的增删改查&#xff0c;所以&#xff0c;这一篇&#xff0c;我直接从java操作数据库开始写&#xff0c;所以这一篇大致就是记一下JDBC、MyBatis、以及SpringBoot的配置文件…...

Oracle数据库基础入门(三): DQL 深入解析与实践

在 Oracle 数据库的知识体系中&#xff0c;数据查询语言&#xff08;DQL&#xff09;无疑是最为常用且关键的部分之一。对于 Java 全栈开发者而言&#xff0c;熟练掌握 DQL 不仅能高效地从数据库中获取所需数据&#xff0c;更是构建强大后端应用的基石。通过 DQL&#xff0c;我…...

P9231 [蓝桥杯 2023 省 A] 平方差

P9231 [蓝桥杯 2023 省 A] 平方差 - 洛谷 题目描述 给定 L,R&#xff0c;问 L≤x≤R 中有多少个数 x 满足存在整数 y,z 使得 xy2−z2。 输入格式 输入一行包含两个整数 L,R&#xff0c;用一个空格分隔。 输出格式 输出一行包含一个整数满足题目给定条件的 x 的数量。 输…...

贪心算法 求解思路

贪心算法简介 贪心算法是通过做一系列的选择来给出某一问题的最优解。对算法中的每一个决策点&#xff0c;做一个当时&#xff08;看起来是&#xff09;最佳的选择。这种启发式策略并不是总能产生出最优解&#xff0c;但它常常能给出最优解。 在实际设计贪心算法时&#xff0…...

2025/2/25,字节跳动后端开发一面面经

一、双方简单自我介绍 面试官先自我介绍,之后属于面试官看简历过程,基本不听。 二、实习中遇到最难的事情,怎么解决的 主要问的还是实习中做过的项目,项目难点在哪里(自己参与的地方),面对困难是怎么思考,怎么实际操作解决的。 三、项目实现细节 掌握自己项目的实…...

Buildroot 添加自定义模块-内置文件到文件系统

目录 概述实现步骤1. 创建包目录和文件结构2. 配置 Config.in3. 定义 cp_bin_files.mk4. 添加源文件install.shmy.conf 5. 配置与编译 概述 Buildroot 是一个高度可定制和模块化的嵌入式 Linux 构建系统&#xff0c;适用于从简单到复杂的各种嵌入式项目. buildroot的源码中bui…...

SpringBoot新闻推荐系统设计与实现

随着信息时代的快速发展&#xff0c;新闻推荐系统成为用户获取个性化内容的重要工具。本文将介绍一个幽络源的基于SpringBoot开发的新闻推荐系统&#xff0c;该系统功能全面&#xff0c;操作简便&#xff0c;能够满足管理员和用户的多种需求。 管理员模块 管理员模块为系统管…...

领域驱动设计:事件溯源架构简介

概述 事件溯源架构通常由3种应用设计模式组成,分别是:事件驱动(Event Driven),事件溯源(Event Source)、CQRS(读写分离)。这三种应用设计模式常见于领域驱动设计(DDD)中,但它们本身是一种应用设计的思想,不仅仅局限于DDD,每一种模式都可以单独拿出来使用。 E…...

基于Java+Spring+Mybsita+mysql的汽租车辆共享平台的设计源码+设计文档

文末获取源码数据库文档 感兴趣的可以先收藏&#xff0c;有毕设问题&#xff0c;项目以及论文撰写等问题都可以和博主沟通&#xff0c;尽最大努力帮助更多的人&#xff01; 目录 1软件需求 1.1引言 1.1.1编写目的 1.1.2背景 1.2 绪论 1.2.1&#xff0d;Internet与…...

深度学习的正则化深入探讨

文章目录 一、说明二、学习目标三、什么是机器学习中的正则化四、了解过拟合和欠拟合五、代价函数的意义六、什么是偏差和方差&#xff1f;七、机器学习中的正则化&#xff1f; 一、说明 在训练机器学习模型时&#xff0c;模型很容易过拟合或欠拟合。为了避免这种情况&#xf…...

Token相关设计

文章目录 1. 双Token 机制概述1.1 访问令牌&#xff08;Access Token&#xff09;1.2 刷新令牌&#xff08;Refresh Token&#xff09; 2. 双Token 认证流程3. Spring Boot 具体实现3.1 生成 Token&#xff08;使用 JWT&#xff09;3.2 解析 Token3.3 登录接口&#xff08;返回…...

【时序预测】在线学习:算法选择(从线性模型到深度学习解析)

——如何为动态时序预测匹配最佳增量学习策略&#xff1f; 引言&#xff1a;在线学习的核心价值与挑战 在动态时序预测场景中&#xff08;如实时交通预测、能源消耗监控&#xff09;&#xff0c;数据以流式&#xff08;Streaming&#xff09;形式持续生成&#xff0c;且潜在的…...

React antd的datePicker自定义,封装成组件

一、antd的datePicker自定义 需求&#xff1a;用户需要为日期选择器的每个日期单元格添加一个Tooltip&#xff0c;当鼠标悬停时显示日期、可兑换流量余额和本公会可兑流量。这些数据需要从接口获取。我需要结合之前的代码&#xff0c;确保Tooltip正确显示&#xff0c;并且数据…...

学生管理前端

文章目录 首页student.html查询功能 首页 SpringBoot前端html页面放在static文件夹下&#xff1a;/src/main/resources/static 默认首页为index.html&#xff0c;我们可以用两个超链接或者两个button跳转到对应的页面。这里只是单纯的跳转页面&#xff0c;不需要提交表单等其…...

深入理解并实现自定义 unordered_map 和 unordered_set

亲爱的读者朋友们&#x1f603;&#xff0c;此文开启知识盛宴与思想碰撞&#x1f389;。 快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 在 C 的标准模板库&#xff08;STL&#xff09;中&#xff0c;unorder…...

顶顶通呼叫中心中间件(mod_cti基于FreeSWITCH)-大模型电话机器人

语音流直接对接Realtime API 多模态大模型 直接把音频流输出给大模型&#xff0c;大模型返回音频流。 顶顶通CTI对Realtime API 的支持 提供了以下2个APP可对接任意 •cti_audio_stream 通过TCP推流和播放流&#xff0c;适合用于人机对话场景。 •cti_unicast_start 通过旁…...

kinova机械臂绿色灯一闪一闪及刷机方法

一、背景 实验室有两个kinova mico机械臂&#xff0c;但经常出现操纵杆上的绿色灯一闪一闪的&#xff0c;导致无法使用操纵杆或ROS进行控制&#xff0c;下面给出官方的教程以及所需要的FS 0CPP 0008_6.2.5_mico_6dof.hex文件。 重要的东西写在前面&#xff1a; a、如果出现操…...

第16天:C++多线程完全指南 - 从基础到现代并发编程

第16天&#xff1a;C多线程完全指南 - 从基础到现代并发编程 一、多线程基础概念 1. 线程创建与管理&#xff08;C11&#xff09; #include <iostream> #include <thread>void hello() {std::cout << "Hello from thread " << std::this_…...

中科大计算机网络原理 1.5 Internt结构和ISP

一、互联网的层次化架构 ‌覆盖范围分层‌ ‌主干网&#xff08;Tier-1级&#xff09;‌ 国家级或行业级核心网络&#xff0c;承担跨区域数据传输和全球互联功能。例如中国的四大主干网&#xff08;ChinaNET、CERNET等&#xff09;以及跨国运营商&#xff08;如AT&T、Deuts…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

遍历 Map 类型集合的方法汇总

1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...

质量体系的重要

质量体系是为确保产品、服务或过程质量满足规定要求&#xff0c;由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面&#xff1a; &#x1f3db;️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限&#xff0c;形成层级清晰的管理网络&#xf…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践

6月5日&#xff0c;2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席&#xff0c;并作《智能体在安全领域的应用实践》主题演讲&#xff0c;分享了在智能体在安全领域的突破性实践。他指出&#xff0c;百度通过将安全能力…...

tree 树组件大数据卡顿问题优化

问题背景 项目中有用到树组件用来做文件目录&#xff0c;但是由于这个树组件的节点越来越多&#xff0c;导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多&#xff0c;导致的浏览器卡顿&#xff0c;这里很明显就需要用到虚拟列表的技术&…...

免费数学几何作图web平台

光锐软件免费数学工具&#xff0c;maths,数学制图&#xff0c;数学作图&#xff0c;几何作图&#xff0c;几何&#xff0c;AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...

第7篇:中间件全链路监控与 SQL 性能分析实践

7.1 章节导读 在构建数据库中间件的过程中&#xff0c;可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中&#xff0c;必须做到&#xff1a; &#x1f50d; 追踪每一条 SQL 的生命周期&#xff08;从入口到数据库执行&#xff09;&#…...

tauri项目,如何在rust端读取电脑环境变量

如果想在前端通过调用来获取环境变量的值&#xff0c;可以通过标准的依赖&#xff1a; std::env::var(name).ok() 想在前端通过调用来获取&#xff0c;可以写一个command函数&#xff1a; #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...

stm32进入Infinite_Loop原因(因为有系统中断函数未自定义实现)

这是系统中断服务程序的默认处理汇编函数&#xff0c;如果我们没有定义实现某个中断函数&#xff0c;那么当stm32产生了该中断时&#xff0c;就会默认跑这里来了&#xff0c;所以我们打开了什么中断&#xff0c;一定要记得实现对应的系统中断函数&#xff0c;否则会进来一直循环…...