当前位置: 首页 > news >正文

010 rocketmq批量消息

文章目录

  • 批量消息
  • BatchProducer.java
  • BatchConsumer.java

批量消息

批量发送可以提⾼发送性能,但有⼀定的限制:
topic 相同
waitStoreMsgOK 相同 (⾸先我们建设消息的iswaitstoremsgok=true(默认为true), 如果没有异常,我们将始终收到"OK",org.apache.rocketmq.common.message.Message#isWaitStoreMsgOK)
不支持延时发送
⼀批消息的大小不能⼤于 4M(DefaultMQProducer.maxMessageSize)
大小限制需要特殊注意,因为消息是动态的,不注意的话就可能超限,就会报错:
计算消息的大小
= (topic + body + (key + value) * N) * 吞吐量

int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();
}

BatchProducer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;public class BatchProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String topic = "TopicTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));//then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {try {List<Message>  listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();//handle the error}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);//计算消息的大小 = (topic + body + (key + value) * N) * 吞吐量int tmpSize = message.getTopic().length() + message.getBody().length;//属性值的添加Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {//+key + valuetmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; //for log overheadif (tmpSize > SIZE_LIMIT) {//it is unexpected that single message exceeds the SIZE_LIMIT//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex == 0) {//if the next sublist has no element, add this one and then break, otherwise just breaknextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}

BatchConsumer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class BatchConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");// Subscribe one more more topics to consume.consumer.subscribe("TopicTest", "*");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}

相关文章:

010 rocketmq批量消息

文章目录 批量消息BatchProducer.javaBatchConsumer.java 批量消息 批量发送可以提⾼发送性能&#xff0c;但有⼀定的限制&#xff1a; topic 相同 waitStoreMsgOK 相同 &#xff08;⾸先我们建设消息的iswaitstoremsgoktrue(默认为true), 如果没有异常,我们将始终收到"O…...

JavaWeb后端基础(3)

原打算把Mysql操作数据库的一些知识写进去&#xff0c;但是感觉没必要&#xff0c;要是现在会的都是简单的增删改查&#xff0c;所以&#xff0c;这一篇&#xff0c;我直接从java操作数据库开始写&#xff0c;所以这一篇大致就是记一下JDBC、MyBatis、以及SpringBoot的配置文件…...

Oracle数据库基础入门(三): DQL 深入解析与实践

在 Oracle 数据库的知识体系中&#xff0c;数据查询语言&#xff08;DQL&#xff09;无疑是最为常用且关键的部分之一。对于 Java 全栈开发者而言&#xff0c;熟练掌握 DQL 不仅能高效地从数据库中获取所需数据&#xff0c;更是构建强大后端应用的基石。通过 DQL&#xff0c;我…...

P9231 [蓝桥杯 2023 省 A] 平方差

P9231 [蓝桥杯 2023 省 A] 平方差 - 洛谷 题目描述 给定 L,R&#xff0c;问 L≤x≤R 中有多少个数 x 满足存在整数 y,z 使得 xy2−z2。 输入格式 输入一行包含两个整数 L,R&#xff0c;用一个空格分隔。 输出格式 输出一行包含一个整数满足题目给定条件的 x 的数量。 输…...

贪心算法 求解思路

贪心算法简介 贪心算法是通过做一系列的选择来给出某一问题的最优解。对算法中的每一个决策点&#xff0c;做一个当时&#xff08;看起来是&#xff09;最佳的选择。这种启发式策略并不是总能产生出最优解&#xff0c;但它常常能给出最优解。 在实际设计贪心算法时&#xff0…...

2025/2/25,字节跳动后端开发一面面经

一、双方简单自我介绍 面试官先自我介绍,之后属于面试官看简历过程,基本不听。 二、实习中遇到最难的事情,怎么解决的 主要问的还是实习中做过的项目,项目难点在哪里(自己参与的地方),面对困难是怎么思考,怎么实际操作解决的。 三、项目实现细节 掌握自己项目的实…...

Buildroot 添加自定义模块-内置文件到文件系统

目录 概述实现步骤1. 创建包目录和文件结构2. 配置 Config.in3. 定义 cp_bin_files.mk4. 添加源文件install.shmy.conf 5. 配置与编译 概述 Buildroot 是一个高度可定制和模块化的嵌入式 Linux 构建系统&#xff0c;适用于从简单到复杂的各种嵌入式项目. buildroot的源码中bui…...

SpringBoot新闻推荐系统设计与实现

随着信息时代的快速发展&#xff0c;新闻推荐系统成为用户获取个性化内容的重要工具。本文将介绍一个幽络源的基于SpringBoot开发的新闻推荐系统&#xff0c;该系统功能全面&#xff0c;操作简便&#xff0c;能够满足管理员和用户的多种需求。 管理员模块 管理员模块为系统管…...

领域驱动设计:事件溯源架构简介

概述 事件溯源架构通常由3种应用设计模式组成,分别是:事件驱动(Event Driven),事件溯源(Event Source)、CQRS(读写分离)。这三种应用设计模式常见于领域驱动设计(DDD)中,但它们本身是一种应用设计的思想,不仅仅局限于DDD,每一种模式都可以单独拿出来使用。 E…...

基于Java+Spring+Mybsita+mysql的汽租车辆共享平台的设计源码+设计文档

文末获取源码数据库文档 感兴趣的可以先收藏&#xff0c;有毕设问题&#xff0c;项目以及论文撰写等问题都可以和博主沟通&#xff0c;尽最大努力帮助更多的人&#xff01; 目录 1软件需求 1.1引言 1.1.1编写目的 1.1.2背景 1.2 绪论 1.2.1&#xff0d;Internet与…...

深度学习的正则化深入探讨

文章目录 一、说明二、学习目标三、什么是机器学习中的正则化四、了解过拟合和欠拟合五、代价函数的意义六、什么是偏差和方差&#xff1f;七、机器学习中的正则化&#xff1f; 一、说明 在训练机器学习模型时&#xff0c;模型很容易过拟合或欠拟合。为了避免这种情况&#xf…...

Token相关设计

文章目录 1. 双Token 机制概述1.1 访问令牌&#xff08;Access Token&#xff09;1.2 刷新令牌&#xff08;Refresh Token&#xff09; 2. 双Token 认证流程3. Spring Boot 具体实现3.1 生成 Token&#xff08;使用 JWT&#xff09;3.2 解析 Token3.3 登录接口&#xff08;返回…...

【时序预测】在线学习:算法选择(从线性模型到深度学习解析)

——如何为动态时序预测匹配最佳增量学习策略&#xff1f; 引言&#xff1a;在线学习的核心价值与挑战 在动态时序预测场景中&#xff08;如实时交通预测、能源消耗监控&#xff09;&#xff0c;数据以流式&#xff08;Streaming&#xff09;形式持续生成&#xff0c;且潜在的…...

React antd的datePicker自定义,封装成组件

一、antd的datePicker自定义 需求&#xff1a;用户需要为日期选择器的每个日期单元格添加一个Tooltip&#xff0c;当鼠标悬停时显示日期、可兑换流量余额和本公会可兑流量。这些数据需要从接口获取。我需要结合之前的代码&#xff0c;确保Tooltip正确显示&#xff0c;并且数据…...

学生管理前端

文章目录 首页student.html查询功能 首页 SpringBoot前端html页面放在static文件夹下&#xff1a;/src/main/resources/static 默认首页为index.html&#xff0c;我们可以用两个超链接或者两个button跳转到对应的页面。这里只是单纯的跳转页面&#xff0c;不需要提交表单等其…...

深入理解并实现自定义 unordered_map 和 unordered_set

亲爱的读者朋友们&#x1f603;&#xff0c;此文开启知识盛宴与思想碰撞&#x1f389;。 快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 在 C 的标准模板库&#xff08;STL&#xff09;中&#xff0c;unorder…...

顶顶通呼叫中心中间件(mod_cti基于FreeSWITCH)-大模型电话机器人

语音流直接对接Realtime API 多模态大模型 直接把音频流输出给大模型&#xff0c;大模型返回音频流。 顶顶通CTI对Realtime API 的支持 提供了以下2个APP可对接任意 •cti_audio_stream 通过TCP推流和播放流&#xff0c;适合用于人机对话场景。 •cti_unicast_start 通过旁…...

kinova机械臂绿色灯一闪一闪及刷机方法

一、背景 实验室有两个kinova mico机械臂&#xff0c;但经常出现操纵杆上的绿色灯一闪一闪的&#xff0c;导致无法使用操纵杆或ROS进行控制&#xff0c;下面给出官方的教程以及所需要的FS 0CPP 0008_6.2.5_mico_6dof.hex文件。 重要的东西写在前面&#xff1a; a、如果出现操…...

第16天:C++多线程完全指南 - 从基础到现代并发编程

第16天&#xff1a;C多线程完全指南 - 从基础到现代并发编程 一、多线程基础概念 1. 线程创建与管理&#xff08;C11&#xff09; #include <iostream> #include <thread>void hello() {std::cout << "Hello from thread " << std::this_…...

中科大计算机网络原理 1.5 Internt结构和ISP

一、互联网的层次化架构 ‌覆盖范围分层‌ ‌主干网&#xff08;Tier-1级&#xff09;‌ 国家级或行业级核心网络&#xff0c;承担跨区域数据传输和全球互联功能。例如中国的四大主干网&#xff08;ChinaNET、CERNET等&#xff09;以及跨国运营商&#xff08;如AT&T、Deuts…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis&#xff1f;2.为什么要使用redis作为mysql的缓存&#xff1f;3.什么是缓存雪崩、缓存穿透、缓存击穿&#xff1f;3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

go 里面的指针

指针 在 Go 中&#xff0c;指针&#xff08;pointer&#xff09;是一个变量的内存地址&#xff0c;就像 C 语言那样&#xff1a; a : 10 p : &a // p 是一个指向 a 的指针 fmt.Println(*p) // 输出 10&#xff0c;通过指针解引用• &a 表示获取变量 a 的地址 p 表示…...

云安全与网络安全:核心区别与协同作用解析

在数字化转型的浪潮中&#xff0c;云安全与网络安全作为信息安全的两大支柱&#xff0c;常被混淆但本质不同。本文将从概念、责任分工、技术手段、威胁类型等维度深入解析两者的差异&#xff0c;并探讨它们的协同作用。 一、核心区别 定义与范围 网络安全&#xff1a;聚焦于保…...

如何在Windows本机安装Python并确保与Python.NET兼容

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…...

Monorepo架构: Nx Cloud 扩展能力与缓存加速

借助 Nx Cloud 实现项目协同与加速构建 1 &#xff09; 缓存工作原理分析 在了解了本地缓存和远程缓存之后&#xff0c;我们来探究缓存是如何工作的。以计算文件的哈希串为例&#xff0c;若后续运行任务时文件哈希串未变&#xff0c;系统会直接使用对应的输出和制品文件。 2 …...

CSS3相关知识点

CSS3相关知识点 CSS3私有前缀私有前缀私有前缀存在的意义常见浏览器的私有前缀 CSS3基本语法CSS3 新增长度单位CSS3 新增颜色设置方式CSS3 新增选择器CSS3 新增盒模型相关属性box-sizing 怪异盒模型resize调整盒子大小box-shadow 盒子阴影opacity 不透明度 CSS3 新增背景属性ba…...

【51单片机】4. 模块化编程与LCD1602Debug

1. 什么是模块化编程 传统编程会将所有函数放在main.c中&#xff0c;如果使用的模块多&#xff0c;一个文件内会有很多代码&#xff0c;不利于组织和管理 模块化编程则是将各个模块的代码放在不同的.c文件里&#xff0c;在.h文件里提供外部可调用函数声明&#xff0c;其他.c文…...

高保真组件库:开关

一:制作关状态 拖入一个矩形作为关闭的底色:44 x 22,填充灰色CCCCCC,圆角23,边框宽度0,文本为”关“,右对齐,边距2,2,6,2,文本颜色白色FFFFFF。 拖拽一个椭圆,尺寸18 x 18,边框为0。3. 全选转为动态面板状态1命名为”关“。 二:制作开状态 复制关状态并命名为”开…...