当前位置: 首页 > news >正文

010 rocketmq批量消息

文章目录

  • 批量消息
  • BatchProducer.java
  • BatchConsumer.java

批量消息

批量发送可以提⾼发送性能,但有⼀定的限制:
topic 相同
waitStoreMsgOK 相同 (⾸先我们建设消息的iswaitstoremsgok=true(默认为true), 如果没有异常,我们将始终收到"OK",org.apache.rocketmq.common.message.Message#isWaitStoreMsgOK)
不支持延时发送
⼀批消息的大小不能⼤于 4M(DefaultMQProducer.maxMessageSize)
大小限制需要特殊注意,因为消息是动态的,不注意的话就可能超限,就会报错:
计算消息的大小
= (topic + body + (key + value) * N) * 吞吐量

int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();
}

BatchProducer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;public class BatchProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String topic = "TopicTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));//then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {try {List<Message>  listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();//handle the error}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);//计算消息的大小 = (topic + body + (key + value) * N) * 吞吐量int tmpSize = message.getTopic().length() + message.getBody().length;//属性值的添加Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {//+key + valuetmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; //for log overheadif (tmpSize > SIZE_LIMIT) {//it is unexpected that single message exceeds the SIZE_LIMIT//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex == 0) {//if the next sublist has no element, add this one and then break, otherwise just breaknextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}

BatchConsumer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class BatchConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");// Subscribe one more more topics to consume.consumer.subscribe("TopicTest", "*");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}

相关文章:

010 rocketmq批量消息

文章目录 批量消息BatchProducer.javaBatchConsumer.java 批量消息 批量发送可以提⾼发送性能&#xff0c;但有⼀定的限制&#xff1a; topic 相同 waitStoreMsgOK 相同 &#xff08;⾸先我们建设消息的iswaitstoremsgoktrue(默认为true), 如果没有异常,我们将始终收到"O…...

JavaWeb后端基础(3)

原打算把Mysql操作数据库的一些知识写进去&#xff0c;但是感觉没必要&#xff0c;要是现在会的都是简单的增删改查&#xff0c;所以&#xff0c;这一篇&#xff0c;我直接从java操作数据库开始写&#xff0c;所以这一篇大致就是记一下JDBC、MyBatis、以及SpringBoot的配置文件…...

Oracle数据库基础入门(三): DQL 深入解析与实践

在 Oracle 数据库的知识体系中&#xff0c;数据查询语言&#xff08;DQL&#xff09;无疑是最为常用且关键的部分之一。对于 Java 全栈开发者而言&#xff0c;熟练掌握 DQL 不仅能高效地从数据库中获取所需数据&#xff0c;更是构建强大后端应用的基石。通过 DQL&#xff0c;我…...

P9231 [蓝桥杯 2023 省 A] 平方差

P9231 [蓝桥杯 2023 省 A] 平方差 - 洛谷 题目描述 给定 L,R&#xff0c;问 L≤x≤R 中有多少个数 x 满足存在整数 y,z 使得 xy2−z2。 输入格式 输入一行包含两个整数 L,R&#xff0c;用一个空格分隔。 输出格式 输出一行包含一个整数满足题目给定条件的 x 的数量。 输…...

贪心算法 求解思路

贪心算法简介 贪心算法是通过做一系列的选择来给出某一问题的最优解。对算法中的每一个决策点&#xff0c;做一个当时&#xff08;看起来是&#xff09;最佳的选择。这种启发式策略并不是总能产生出最优解&#xff0c;但它常常能给出最优解。 在实际设计贪心算法时&#xff0…...

2025/2/25,字节跳动后端开发一面面经

一、双方简单自我介绍 面试官先自我介绍,之后属于面试官看简历过程,基本不听。 二、实习中遇到最难的事情,怎么解决的 主要问的还是实习中做过的项目,项目难点在哪里(自己参与的地方),面对困难是怎么思考,怎么实际操作解决的。 三、项目实现细节 掌握自己项目的实…...

Buildroot 添加自定义模块-内置文件到文件系统

目录 概述实现步骤1. 创建包目录和文件结构2. 配置 Config.in3. 定义 cp_bin_files.mk4. 添加源文件install.shmy.conf 5. 配置与编译 概述 Buildroot 是一个高度可定制和模块化的嵌入式 Linux 构建系统&#xff0c;适用于从简单到复杂的各种嵌入式项目. buildroot的源码中bui…...

SpringBoot新闻推荐系统设计与实现

随着信息时代的快速发展&#xff0c;新闻推荐系统成为用户获取个性化内容的重要工具。本文将介绍一个幽络源的基于SpringBoot开发的新闻推荐系统&#xff0c;该系统功能全面&#xff0c;操作简便&#xff0c;能够满足管理员和用户的多种需求。 管理员模块 管理员模块为系统管…...

领域驱动设计:事件溯源架构简介

概述 事件溯源架构通常由3种应用设计模式组成,分别是:事件驱动(Event Driven),事件溯源(Event Source)、CQRS(读写分离)。这三种应用设计模式常见于领域驱动设计(DDD)中,但它们本身是一种应用设计的思想,不仅仅局限于DDD,每一种模式都可以单独拿出来使用。 E…...

基于Java+Spring+Mybsita+mysql的汽租车辆共享平台的设计源码+设计文档

文末获取源码数据库文档 感兴趣的可以先收藏&#xff0c;有毕设问题&#xff0c;项目以及论文撰写等问题都可以和博主沟通&#xff0c;尽最大努力帮助更多的人&#xff01; 目录 1软件需求 1.1引言 1.1.1编写目的 1.1.2背景 1.2 绪论 1.2.1&#xff0d;Internet与…...

深度学习的正则化深入探讨

文章目录 一、说明二、学习目标三、什么是机器学习中的正则化四、了解过拟合和欠拟合五、代价函数的意义六、什么是偏差和方差&#xff1f;七、机器学习中的正则化&#xff1f; 一、说明 在训练机器学习模型时&#xff0c;模型很容易过拟合或欠拟合。为了避免这种情况&#xf…...

Token相关设计

文章目录 1. 双Token 机制概述1.1 访问令牌&#xff08;Access Token&#xff09;1.2 刷新令牌&#xff08;Refresh Token&#xff09; 2. 双Token 认证流程3. Spring Boot 具体实现3.1 生成 Token&#xff08;使用 JWT&#xff09;3.2 解析 Token3.3 登录接口&#xff08;返回…...

【时序预测】在线学习:算法选择(从线性模型到深度学习解析)

——如何为动态时序预测匹配最佳增量学习策略&#xff1f; 引言&#xff1a;在线学习的核心价值与挑战 在动态时序预测场景中&#xff08;如实时交通预测、能源消耗监控&#xff09;&#xff0c;数据以流式&#xff08;Streaming&#xff09;形式持续生成&#xff0c;且潜在的…...

React antd的datePicker自定义,封装成组件

一、antd的datePicker自定义 需求&#xff1a;用户需要为日期选择器的每个日期单元格添加一个Tooltip&#xff0c;当鼠标悬停时显示日期、可兑换流量余额和本公会可兑流量。这些数据需要从接口获取。我需要结合之前的代码&#xff0c;确保Tooltip正确显示&#xff0c;并且数据…...

学生管理前端

文章目录 首页student.html查询功能 首页 SpringBoot前端html页面放在static文件夹下&#xff1a;/src/main/resources/static 默认首页为index.html&#xff0c;我们可以用两个超链接或者两个button跳转到对应的页面。这里只是单纯的跳转页面&#xff0c;不需要提交表单等其…...

深入理解并实现自定义 unordered_map 和 unordered_set

亲爱的读者朋友们&#x1f603;&#xff0c;此文开启知识盛宴与思想碰撞&#x1f389;。 快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 在 C 的标准模板库&#xff08;STL&#xff09;中&#xff0c;unorder…...

顶顶通呼叫中心中间件(mod_cti基于FreeSWITCH)-大模型电话机器人

语音流直接对接Realtime API 多模态大模型 直接把音频流输出给大模型&#xff0c;大模型返回音频流。 顶顶通CTI对Realtime API 的支持 提供了以下2个APP可对接任意 •cti_audio_stream 通过TCP推流和播放流&#xff0c;适合用于人机对话场景。 •cti_unicast_start 通过旁…...

kinova机械臂绿色灯一闪一闪及刷机方法

一、背景 实验室有两个kinova mico机械臂&#xff0c;但经常出现操纵杆上的绿色灯一闪一闪的&#xff0c;导致无法使用操纵杆或ROS进行控制&#xff0c;下面给出官方的教程以及所需要的FS 0CPP 0008_6.2.5_mico_6dof.hex文件。 重要的东西写在前面&#xff1a; a、如果出现操…...

第16天:C++多线程完全指南 - 从基础到现代并发编程

第16天&#xff1a;C多线程完全指南 - 从基础到现代并发编程 一、多线程基础概念 1. 线程创建与管理&#xff08;C11&#xff09; #include <iostream> #include <thread>void hello() {std::cout << "Hello from thread " << std::this_…...

中科大计算机网络原理 1.5 Internt结构和ISP

一、互联网的层次化架构 ‌覆盖范围分层‌ ‌主干网&#xff08;Tier-1级&#xff09;‌ 国家级或行业级核心网络&#xff0c;承担跨区域数据传输和全球互联功能。例如中国的四大主干网&#xff08;ChinaNET、CERNET等&#xff09;以及跨国运营商&#xff08;如AT&T、Deuts…...

Windows离线语音转文字终极指南:TMSpeech让会议记录变得简单高效!

Windows离线语音转文字终极指南&#xff1a;TMSpeech让会议记录变得简单高效&#xff01; 【免费下载链接】TMSpeech 腾讯会议摸鱼工具 项目地址: https://gitcode.com/gh_mirrors/tm/TMSpeech 还在为会议记录手忙脚乱吗&#xff1f;担心语音识别软件泄露隐私&#xff1…...

5分钟掌握微信防撤回:WeChatIntercept新手完整指南

5分钟掌握微信防撤回&#xff1a;WeChatIntercept新手完整指南 【免费下载链接】WeChatIntercept 微信防撤回插件&#xff0c;一键安装&#xff0c;仅MAC可用&#xff0c;支持v3.7.0微信 项目地址: https://gitcode.com/gh_mirrors/we/WeChatIntercept 还在为错过微信撤…...

ET框架:C#全栈游戏开发的热更与服务端重构实践

1. ET框架不是“又一个Unity网络库”&#xff0c;而是重构服务器开发范式的底层工具链很多人第一次看到“ET框架”四个字&#xff0c;下意识会把它归类为“Unity里用的Socket封装库”或者“带点RPC味道的通信中间件”——这种理解偏差&#xff0c;恰恰是踩坑的起点。我2018年在…...

【芯片测试】:6. 向量、Sequencer 指令与高速串行 IO

Pattern 详解&#xff1a;向量、Sequencer 指令与高速串行 IO系列&#xff1a; Advantest V93000 SmarTest 8 核心概念解析&#xff5c;第 6 篇&#xff08;共 8 篇&#xff09; 适合读者&#xff1a; 需要理解数字测试激励数据结构的工程师前言 Pattern&#xff08;模式&#…...

基于信息论与数据压缩的AI文本检测:AIDetx原理与工程实践

1. 项目概述&#xff1a;当AI写作遇上信息论 最近几年&#xff0c;AI生成文本的能力突飞猛进&#xff0c;从写邮件、做摘要到创作故事&#xff0c;几乎无所不能。但随之而来的一个现实问题也摆在了我们面前&#xff1a;如何分辨一段文字究竟是出自人类之手&#xff0c;还是由AI…...

Rust异步编程实战:构建高性能并发应用

引言 异步编程是构建高性能后端服务的关键技术。作为从Python转向Rust的开发者&#xff0c;我发现Rust的异步模型与Python有很大不同。Rust的异步编程基于协程和事件驱动&#xff0c;通过Tokio运行时实现高效的并发执行。本文将深入探讨Rust异步编程的核心概念、实践模式和性能…...

openEuler 22.03 LST上安装RealVNC 6.11,我踩过的那些依赖坑(附离线包下载方法)

在openEuler 22.03 LST离线环境中部署RealVNC 6.11的完整指南当我们需要在隔离网络的生产环境中部署远程桌面服务时&#xff0c;依赖管理往往成为最棘手的挑战。本文将分享我在openEuler 22.03 LST系统上安装RealVNC 6.11时积累的实战经验&#xff0c;特别是如何处理复杂的离线…...

当Agent开始质疑你的原始数据——AI驱动的数据质量自治体系构建(含动态污点追踪与因果溯源模块)

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;当Agent开始质疑你的原始数据——AI驱动的数据质量自治体系构建&#xff08;含动态污点追踪与因果溯源模块&#xff09; 在传统数据治理范式中&#xff0c;数据质量校验往往滞后于数据摄入&#xff0c;…...

MDK Middleware网络组件的嵌入式安全防护解析

1. MDK Middleware网络组件的安全特性解析在嵌入式系统开发中&#xff0c;网络安全往往是最容易被忽视却又至关重要的环节。作为Keil MDK开发环境的核心组件&#xff0c;Middleware Network为Cortex-M系列微控制器提供了轻量级TCP/IP协议栈实现。不同于桌面级操作系统自带的网络…...

Python之enc-dotenv包语法、参数和实际应用案例

Python enc-dotenv 包完整详解 enc-dotenv 是加密版 python-dotenv 核心增强包&#xff0c;专门解决明文存储环境变量&#xff08;密钥、密码、Token&#xff09; 的安全风险。它能将 .env 文件加密存储&#xff0c;运行时自动解密加载&#xff0c;彻底避免敏感配置明文泄露。 …...