四、RocketMQ发送普通消息、批量消息和延迟消息
Producer发送普通消息的方式
1.同步发送消息
同步消息代表发送端发送消息到broker之后,等待消息发送结果后,再次发送消息

实现步骤
- 创建生产端,声明在哪个生产组
- 注册NameServer地址
- 构建Message实体,指定topic、tag、body
- 启动生产端
- 发送消息
@Test
public void syncSend() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {// 1.创建生产端,声明在哪个生产组DefaultMQProducer producer = new DefaultMQProducer("test_group");// 2.注册NameServer地址producer.setNamesrvAddr(NAME_SERVER_ADDR);// 3.构建Message实体,指定topic、tag、bodyMessage message = new Message("test", "hello world".getBytes());// 4.启动生产端producer.start();// 5.发送消息SendResult sendResult = producer.send(message);System.out.println(sendResult.getSendStatus());
}
2.异步发送消息
异步消息代表发送端发送完消息后,会直接返回,但是可以注册一个回调函数,当broker将消息落盘后,回调这个回调函数

实现步骤
- 创建生产端,声明在哪个生产组
- 注册NameServer地址
- 构建Message实体,指定topic、tag、body
- 启动生产端
- 发送消息,并且实现SendCallback接口
注:这里必须等待异步返回,否则消费者无法消费成功
@Test
public void asyncSend() throws RemotingException, InterruptedException, MQClientException {DefaultMQProducer producer = new DefaultMQProducer("test_group");producer.setNamesrvAddr(NAME_SERVER_ADDR);Message message = new Message("test", "tag-a","hello world".getBytes());producer.start();CountDownLatch countDownLatch = new CountDownLatch(1);// 发送消息,并且实现SendCallback接口producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.println("发送成功:" + sendResult.getSendStatus());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.println("发送失败:" + e);}});countDownLatch.await();
}
3、发送单向消息
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短

实现步骤
- 创建生产端,声明在哪个生产组
- 注册NameServer地址
- 构建Message实体,指定topic、tag、body
- 启动生产端
- 发送单向消息
@Test
public void sendOneWay() throws RemotingException, InterruptedException, MQClientException {DefaultMQProducer producer = new DefaultMQProducer("test_group");producer.setNamesrvAddr(NAME_SERVER_ADDR);Message message = new Message("test","tag-a", "hello world".getBytes());producer.start();producer.sendOneway(message);
}
Producer发送批量消息
在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
@Test
public void sendBatch() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);// 构造批量消息List<Message> list = new ArrayList<>();list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world0".getBytes(Charset.defaultCharset())));list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world1".getBytes(Charset.defaultCharset())));list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world2".getBytes(Charset.defaultCharset())));producer.start();// 发送批量消息producer.send(list);producer.shutdown();
}
**注:**需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。
Producer发送延迟消息
Producer想要发送延迟消息,只要设置Message的DelayTimeLevel属性大于0即可。
RocketMQ无法随意设置延迟消息的延迟时间,只能根据延迟级别进行
延迟级别和延迟时间的对应关系
| 延迟级别 | 延迟时间 | 延迟级别 | 延迟时间 |
|---|---|---|---|
| 1 | 1s | 10 | 6min |
| 2 | 5s | 11 | 7min |
| 3 | 10s | 12 | 8min |
| 4 | 30s | 13 | 9min |
| 5 | 1min | 14 | 10min |
| 6 | 2min | 15 | 20min |
| 7 | 3min | 16 | 30min |
| 8 | 4min | 17 | 1h |
| 9 | 5min | 18 | 2h |
@Test
public void sendDelay() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);producer.start();Message message = new Message(RocketMQConfig.TEST_TOPIC, "hello world".getBytes(Charset.defaultCharset()));// 设置延迟级别message.setDelayTimeLevel(3);// 发送批量消息SendResult sendResult = producer.send(message);System.out.println(sendResult.getSendStatus());producer.shutdown();
}
延迟消息的原理
延迟消息并不会直接发送到指定的topic,而是发送到一个延迟消息对应的topic中
当延迟消息的时间到达后,在将消息发送到指定的topic中
延迟消息投递的流程
-
producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别
-
broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别的delayLevel-1
-
mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列
-
根据消费偏移量offset从commitLog中解析出对应消息
-
从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递
-
若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递
相关文章:
四、RocketMQ发送普通消息、批量消息和延迟消息
Producer发送普通消息的方式 1.同步发送消息 同步消息代表发送端发送消息到broker之后,等待消息发送结果后,再次发送消息 实现步骤 创建生产端,声明在哪个生产组注册NameServer地址构建Message实体,指定topic、tag、body启动…...
idea自定义 postfix completion提高编码效率
postfix completion的使用 详情见: https://www.cnblogs.com/expiator/p/17380495.html 自定义 postfix completion List、 String 初始化list: key: list表达式: List<$EXPR$> $END$List new ArrayList<>();字符串判空&…...
解锁学习电路设计的正确姿势!
...
【Linux】 ps命令使用
作为一个后端的程序员,我们经常用到ps -ef | grep XXX 到底什么事ps呢。 下面我们一起学习一下吧、 ps (英文全拼:process status)命令用于显示当前进程的状态,类似于 windows 的任务管理器。 ps命令 -Linux手册页 …...
打造高效的分布式爬虫系统:利用Scrapy框架实现
在大数据时代的今天,爬虫系统成为了获取和分析海量数据的重要工具。本文将介绍如何使用Scrapy框架来构建一个高效的分布式爬虫系统,以加速数据采集过程和提高系统的可扩展性。 Scrapy框架简介 Scrapy是一个基于Python的强大的开源网络爬虫框架ÿ…...
SpringCloud组件Ribbon的IRule的问题排查
最近很久没有写文章啦,刚好遇到了一个问题,其实问题也挺简单,但是还是得对源码有一定了解才能够发现。 最近在实现一个根据请求流量的标签,将请求转发到对应的节点,其实和俗称的灰度请求有点相似, 实现思…...
比较完整一些chatGPT项目代码(权威)
https://gitee.com/zccbbg/chatgpt-springboot-service yml中的配置文件无法读取,前端访问比较困难。...
Python - 生成二维码、条形码
二维码 import qrcode# 要生成的文本或链接 data "要生成的文本或链接"# 创建QR码对象 qr qrcode.QRCode(version1, # 版本号,通常设置为1error_correctionqrcode.constants.ERROR_CORRECT_L, # 错误修正级别box_size10, # 每个小方块的像素大小bor…...
8+纯生信,多组机器学习+分型探讨黑色素瘤发文思路。
今天给同学们分享一篇泛癌多组机器学习分型的生信文章“Comprehensive characterisation of immunogenic cell death in melanoma revealing the association with prognosis and tumor immune microenvironment”,这篇文章于2022年9月23日发表在Front Immunol 期刊…...
GPU高性能面试-写一个ReduceKernel
要求写一个reduceKernel 要求给出Kerne的完整调用: 1. 进行一维reduce 可以写一个最基础的,仅仅实现基础功能就行 使用share mem进行功能优化 使用shuffles指令完成block reduce操作 2.实现二维reduce...
深入探索STARK的安全性和可靠性——STARKs全面安全分析
1. 引言 non-interactive STARKs,起源于Interactive Oracle Proofs (IOPs),然后通过random oracle模式转换为非交互式。StarkWare团队 ethSTARK Documentation – Version 1.2(2023年7月)论文做了更新,给出了完整具体…...
WPF 控件分辨率自适应问题
WPF 控件分辨率自适应时,我首先想到的是使用ViewBox控件来做分辨率自适应。 ViewBox这个控件通常和其他控件结合起来使用,是WPF中非常有用的控件。定义一个内容容器。ViewBox组件的作用是拉伸或延展位于其中的组件,以填满可用空间࿰…...
CANoe创建仿真工程
CANoe创建仿真工程 写在前面仿真工程的创建创建工程添加CAN数据库添加系统变量创建面板创建网络节点为节点添加代码工程运行测试总结 写在前面 Canoe的安装不是特别方便,我是参加了松勤的培训课程,不仅需要安装软件还需要安装驱动,刚刚学习的…...
Scanner 输入回车跳不出循环的解决方法
题目要求: 输入一行内容包含字符串和数字,将字符串与数字分别提取。 解决方法: 可以使用两个Scanner对象,一个用来键入数据,另外一个用来对数据进行操作,以此来解决输入“回车”跳不出while循环的问题。 i…...
docker 启动 mysql 通过防火墙设置端口无法访问解决方案
1、问题描述:通过 docker compose 启动mysql服务,然而在防火墙添加了3306端口后却无法访问,但是关闭防火墙后又可以访问mysql数据库。 解决方案: 重启 docker 后解决:systemctl restart docker 如果没有解决问题则执…...
智能制造优化,RFID生产线管理系统解决方案
一、背景介绍 随着全球经济的发展,传统制造业面临着越来越高的成本和低利润的挑战,为了提升企业的整体利润率,优化管理流程成为必要的手段之一,在传统的制造企业中,生产线通常采用单件流生产模式,但这种模…...
【Mybatis】基于Mybatis插件+注解,实现敏感数据自动加解密
一、介绍 业务场景中经常会遇到诸如用户手机号,身份证号,银行卡号,邮箱,地址,密码等等信息,属于敏感信息,需要保存在数据库中。而很多公司会会要求对数据库中的此类数据进行加密存储。 敏感数据…...
【特纳斯电子】基于物联网的指纹密码锁系统设计-实物设计
资料下载链接:基于物联网的指纹密码锁系统设计-实物设计 - 电子校园网 编号: T3732205M-SW 设计简介: 本设计是基于单片机的指纹密码锁,主要实现以下功能: 1、可通过密码解锁 2、可通过云平台解锁 3、可通过指纹解…...
【牛客面试必刷TOP101】Day9.BM37 二叉搜索树的最近公共祖先和BM42 用两个栈实现队列
作者简介:大家好,我是未央; 博客首页:未央.303 系列专栏:牛客面试必刷TOP101 每日一句:人的一生,可以有所作为的时机只有一次,那就是现在!!!&…...
10.12 校招 实习 内推 面经
绿*泡*泡: neituijunsir 交流裙 ,内推/实习/校招汇总表格 1、校招 | 2024届秋招,美团哪些校招岗位最缺人?(内推) 校招 | 2024届秋招,美团哪些校招岗位最缺人?(内推&…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...
Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
Bean 作用域有哪些?如何答出技术深度?
导语: Spring 面试绕不开 Bean 的作用域问题,这是面试官考察候选人对 Spring 框架理解深度的常见方式。本文将围绕“Spring 中的 Bean 作用域”展开,结合典型面试题及实战场景,帮你厘清重点,打破模板式回答,…...
Golang——7、包与接口详解
包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...
掌握 HTTP 请求:理解 cURL GET 语法
cURL 是一个强大的命令行工具,用于发送 HTTP 请求和与 Web 服务器交互。在 Web 开发和测试中,cURL 经常用于发送 GET 请求来获取服务器资源。本文将详细介绍 cURL GET 请求的语法和使用方法。 一、cURL 基本概念 cURL 是 "Client URL" 的缩写…...
【LeetCode】算法详解#6 ---除自身以外数组的乘积
1.题目介绍 给定一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O…...
