010 rocketmq批量消息
文章目录
- 批量消息
- BatchProducer.java
- BatchConsumer.java
批量消息
批量发送可以提⾼发送性能,但有⼀定的限制:
topic 相同
waitStoreMsgOK 相同 (⾸先我们建设消息的iswaitstoremsgok=true(默认为true), 如果没有异常,我们将始终收到"OK",org.apache.rocketmq.common.message.Message#isWaitStoreMsgOK)
不支持延时发送
⼀批消息的大小不能⼤于 4M(DefaultMQProducer.maxMessageSize)
大小限制需要特殊注意,因为消息是动态的,不注意的话就可能超限,就会报错:
计算消息的大小
= (topic + body + (key + value) * N) * 吞吐量
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();
}
BatchProducer.java
package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;public class BatchProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String topic = "TopicTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));//then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {try {List<Message> listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();//handle the error}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);//计算消息的大小 = (topic + body + (key + value) * N) * 吞吐量int tmpSize = message.getTopic().length() + message.getBody().length;//属性值的添加Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {//+key + valuetmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; //for log overheadif (tmpSize > SIZE_LIMIT) {//it is unexpected that single message exceeds the SIZE_LIMIT//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex == 0) {//if the next sublist has no element, add this one and then break, otherwise just breaknextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}
BatchConsumer.java
package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class BatchConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");// Subscribe one more more topics to consume.consumer.subscribe("TopicTest", "*");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}相关文章:
010 rocketmq批量消息
文章目录 批量消息BatchProducer.javaBatchConsumer.java 批量消息 批量发送可以提⾼发送性能,但有⼀定的限制: topic 相同 waitStoreMsgOK 相同 (⾸先我们建设消息的iswaitstoremsgoktrue(默认为true), 如果没有异常,我们将始终收到"O…...
JavaWeb后端基础(3)
原打算把Mysql操作数据库的一些知识写进去,但是感觉没必要,要是现在会的都是简单的增删改查,所以,这一篇,我直接从java操作数据库开始写,所以这一篇大致就是记一下JDBC、MyBatis、以及SpringBoot的配置文件…...
Oracle数据库基础入门(三): DQL 深入解析与实践
在 Oracle 数据库的知识体系中,数据查询语言(DQL)无疑是最为常用且关键的部分之一。对于 Java 全栈开发者而言,熟练掌握 DQL 不仅能高效地从数据库中获取所需数据,更是构建强大后端应用的基石。通过 DQL,我…...
P9231 [蓝桥杯 2023 省 A] 平方差
P9231 [蓝桥杯 2023 省 A] 平方差 - 洛谷 题目描述 给定 L,R,问 L≤x≤R 中有多少个数 x 满足存在整数 y,z 使得 xy2−z2。 输入格式 输入一行包含两个整数 L,R,用一个空格分隔。 输出格式 输出一行包含一个整数满足题目给定条件的 x 的数量。 输…...
贪心算法 求解思路
贪心算法简介 贪心算法是通过做一系列的选择来给出某一问题的最优解。对算法中的每一个决策点,做一个当时(看起来是)最佳的选择。这种启发式策略并不是总能产生出最优解,但它常常能给出最优解。 在实际设计贪心算法时࿰…...
2025/2/25,字节跳动后端开发一面面经
一、双方简单自我介绍 面试官先自我介绍,之后属于面试官看简历过程,基本不听。 二、实习中遇到最难的事情,怎么解决的 主要问的还是实习中做过的项目,项目难点在哪里(自己参与的地方),面对困难是怎么思考,怎么实际操作解决的。 三、项目实现细节 掌握自己项目的实…...
Buildroot 添加自定义模块-内置文件到文件系统
目录 概述实现步骤1. 创建包目录和文件结构2. 配置 Config.in3. 定义 cp_bin_files.mk4. 添加源文件install.shmy.conf 5. 配置与编译 概述 Buildroot 是一个高度可定制和模块化的嵌入式 Linux 构建系统,适用于从简单到复杂的各种嵌入式项目. buildroot的源码中bui…...
SpringBoot新闻推荐系统设计与实现
随着信息时代的快速发展,新闻推荐系统成为用户获取个性化内容的重要工具。本文将介绍一个幽络源的基于SpringBoot开发的新闻推荐系统,该系统功能全面,操作简便,能够满足管理员和用户的多种需求。 管理员模块 管理员模块为系统管…...
领域驱动设计:事件溯源架构简介
概述 事件溯源架构通常由3种应用设计模式组成,分别是:事件驱动(Event Driven),事件溯源(Event Source)、CQRS(读写分离)。这三种应用设计模式常见于领域驱动设计(DDD)中,但它们本身是一种应用设计的思想,不仅仅局限于DDD,每一种模式都可以单独拿出来使用。 E…...
基于Java+Spring+Mybsita+mysql的汽租车辆共享平台的设计源码+设计文档
文末获取源码数据库文档 感兴趣的可以先收藏,有毕设问题,项目以及论文撰写等问题都可以和博主沟通,尽最大努力帮助更多的人! 目录 1软件需求 1.1引言 1.1.1编写目的 1.1.2背景 1.2 绪论 1.2.1-Internet与…...
深度学习的正则化深入探讨
文章目录 一、说明二、学习目标三、什么是机器学习中的正则化四、了解过拟合和欠拟合五、代价函数的意义六、什么是偏差和方差?七、机器学习中的正则化? 一、说明 在训练机器学习模型时,模型很容易过拟合或欠拟合。为了避免这种情况…...
Token相关设计
文章目录 1. 双Token 机制概述1.1 访问令牌(Access Token)1.2 刷新令牌(Refresh Token) 2. 双Token 认证流程3. Spring Boot 具体实现3.1 生成 Token(使用 JWT)3.2 解析 Token3.3 登录接口(返回…...
【时序预测】在线学习:算法选择(从线性模型到深度学习解析)
——如何为动态时序预测匹配最佳增量学习策略? 引言:在线学习的核心价值与挑战 在动态时序预测场景中(如实时交通预测、能源消耗监控),数据以流式(Streaming)形式持续生成,且潜在的…...
React antd的datePicker自定义,封装成组件
一、antd的datePicker自定义 需求:用户需要为日期选择器的每个日期单元格添加一个Tooltip,当鼠标悬停时显示日期、可兑换流量余额和本公会可兑流量。这些数据需要从接口获取。我需要结合之前的代码,确保Tooltip正确显示,并且数据…...
学生管理前端
文章目录 首页student.html查询功能 首页 SpringBoot前端html页面放在static文件夹下:/src/main/resources/static 默认首页为index.html,我们可以用两个超链接或者两个button跳转到对应的页面。这里只是单纯的跳转页面,不需要提交表单等其…...
深入理解并实现自定义 unordered_map 和 unordered_set
亲爱的读者朋友们😃,此文开启知识盛宴与思想碰撞🎉。 快来参与讨论💬,点赞👍、收藏⭐、分享📤,共创活力社区。 在 C 的标准模板库(STL)中,unorder…...
顶顶通呼叫中心中间件(mod_cti基于FreeSWITCH)-大模型电话机器人
语音流直接对接Realtime API 多模态大模型 直接把音频流输出给大模型,大模型返回音频流。 顶顶通CTI对Realtime API 的支持 提供了以下2个APP可对接任意 •cti_audio_stream 通过TCP推流和播放流,适合用于人机对话场景。 •cti_unicast_start 通过旁…...
kinova机械臂绿色灯一闪一闪及刷机方法
一、背景 实验室有两个kinova mico机械臂,但经常出现操纵杆上的绿色灯一闪一闪的,导致无法使用操纵杆或ROS进行控制,下面给出官方的教程以及所需要的FS 0CPP 0008_6.2.5_mico_6dof.hex文件。 重要的东西写在前面: a、如果出现操…...
第16天:C++多线程完全指南 - 从基础到现代并发编程
第16天:C多线程完全指南 - 从基础到现代并发编程 一、多线程基础概念 1. 线程创建与管理(C11) #include <iostream> #include <thread>void hello() {std::cout << "Hello from thread " << std::this_…...
中科大计算机网络原理 1.5 Internt结构和ISP
一、互联网的层次化架构 覆盖范围分层 主干网(Tier-1级) 国家级或行业级核心网络,承担跨区域数据传输和全球互联功能。例如中国的四大主干网(ChinaNET、CERNET等)以及跨国运营商(如AT&T、Deuts…...
微信小程序之bind和catch
这两个呢,都是绑定事件用的,具体使用有些小区别。 官方文档: 事件冒泡处理不同 bind:绑定的事件会向上冒泡,即触发当前组件的事件后,还会继续触发父组件的相同事件。例如,有一个子视图绑定了b…...
UE5 学习系列(三)创建和移动物体
这篇博客是该系列的第三篇,是在之前两篇博客的基础上展开,主要介绍如何在操作界面中创建和拖动物体,这篇博客跟随的视频链接如下: B 站视频:s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...
【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...
DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...
VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP
编辑-虚拟网络编辑器-更改设置 选择桥接模式,然后找到相应的网卡(可以查看自己本机的网络连接) windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置,选择刚才配置的桥接模式 静态ip设置: 我用的ubuntu24桌…...
【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...
