RocketMQ笔记(八)SpringBoot整合RocketMQ广播消费消息
目录
- 一、简介
- 1.1、消费模式
- 二、消费者
- 2.1、maven依赖
- 2.2、application配置
- 2.3、消费监听
- 三、生产者
- 3.1、发送消息
- 3.2、运行结果
- 四、其他
一、简介
在之前的文章中,我们讲过了,同步发送单条消息,异步发送单条消息,发送单向消息,发送顺序消息,批量发送消息,事务消息,我们使用的模式都是 集群消费模式(Cluster),本文就来讲另外一种消息消费模式,也就是广播消费模式(Broadcast)
1.1、消费模式
在 Apache RocketMQ 中,实现消息消费的方式主要是两种:
-
集群消费模式(Cluster):
在集群消费模式下,同一个消费者组(Consumer Group)中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上,但是同一个消息只会被同一个消费者组中的一个消费者消费。 -
广播消费模式(Broadcast):
在广播消费模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,即每个消费者都会独立地消费消息。消息会被广播到同一个消费者组中的所有消费者实例上。
那么怎么使用广播消费模式呢?其实很简单,通过在消费者的 @RocketMQMessageListener 注解中设置 messageModel 参数为 MessageModel.BROADCASTING,即可将消费者设置为广播模式。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,从而实现了消息的广播消费。接下里看看具体操作吧。
二、消费者
2.1、maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq</artifactId><groupId>com.alian</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>11-broadcasting-message-one</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties></project>
2.2、application配置
application.properties
server.port=8011# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的消费者组
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 批量拉取消息的数量
rocketmq.consumer.pull-batch-size=10
# 广播消费模式
rocketmq.consumer.message-model=BROADCASTING
实际上对于本文来说,下面两个配置不用配置,也不会生效。
# 默认的消费者组
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 广播消费模式
rocketmq.consumer.message-model=BROADCASTING
因为优先的是@RocketMQMessageListener 注解中设置 consumerGroup 和messageModel 参数。
2.3、消费监听
@RocketMQMessageListener是RocketMQ提供的注解,用于配置消费者监听器的相关属性。
package com.alian.broadcasting;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = "broadcasting_string_message_topic",consumerGroup = "BROADCASTING_CONSUMER_GROUP",messageModel = MessageModel.BROADCASTING)
public class StringMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("第一个消费者接收到的字符串消息: {}", message);// 处理消息的业务逻辑}
}
关于这里@RocketMQMessageListener的参数做个简单解释:
- topic:必填,指定该消费者订阅的Topic名称
- consumerGroup:必填,指定该消费者所属的消费者组名称,同一个组内的消费者实例通常进行负载均衡消费
- messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
MessageModel.java
public enum MessageModel {BROADCASTING("BROADCASTING"),CLUSTERING("CLUSTERING");private final String modeCN;MessageModel(String modeCN) {this.modeCN = modeCN;}public String getModeCN() {return this.modeCN;}
}
三、生产者
生产者我就复用前面批量消息发送的模块了
3.1、发送消息
@Slf4j
@SpringBootTest
public class SendBatchedBroadcastingMessageTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void syncSendStringMessagesWithBuilder() {String topic = "broadcasting_string_message_topic";for (int i = 0; i < 10; i++) {String message = "广播消息:" + i;Message<String> rocketMessage = MessageBuilder.withPayload(message).build();rocketMQTemplate.convertAndSend(topic, rocketMessage);}}@Testpublic void syncSendBatchStringMessagesWithBuilder() {String topic = "string_message_topic";String message = "批量广播消息:";List<Message<String>> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> rocketMessage = MessageBuilder.withPayload(message + i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSend发送批量消息SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);log.info("批量消息发送结果:{}",sendResult);}@AfterEachpublic void waiting() {try {Thread.sleep(3000L);} catch (InterruptedException e) {e.printStackTrace();}}}
我们先启动消费者,然后生产者发送消息。
3.2、运行结果
运行结果:
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:1
[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:0
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:3
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:2
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:9
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:0
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:2
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:4
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:5
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:3
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:1
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:6
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:9
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:8[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:0
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:1
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:2
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:3
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:9
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:4
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:6
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:2
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:3
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:8
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:1
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:0
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:9
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:5
四、其他
RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者;如果消费者重启或重新加入组,就能从对应Queue的offset处继续消费。
不过使用广播消费模式时,Consumer Group 的概念基本上没有作用,因为每个消费者实例都会独立地收到消息的一个副本。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,而不像集群消费模式中那样,一个消费者组中的消费者会共同消费消息。
广播消费模式在RocketMQ中最好的好处就是消费者解耦:不同的消费者可以独立消费消息,相互之间不受影响,提高了系统的扩展性,它的适用场景有:
- 日志收集 - 需要将日志数据分发给多个日志收集系统,每个系统都需要收到全量日志。
- 数据备份 - 实时备份数据到多个存储系统,确保数据有冗余副本。
- 信息推送 - 向多个推送通道投递并发送消息通知,如站内信、短信、Push等。
- 状态同步 - 将数据变更实时同步到集群的所有节点,保证集群节点状态一致。
- 负载均衡 - 将任务或请求广播给所有服务实例,由每个实例独立处理,实现负载分担。
- 监控告警 - 将系统监控数据广播给多个监控系统,多视角分析。
相关文章:
RocketMQ笔记(八)SpringBoot整合RocketMQ广播消费消息
目录 一、简介1.1、消费模式 二、消费者2.1、maven依赖2.2、application配置2.3、消费监听 三、生产者3.1、发送消息3.2、运行结果 四、其他 一、简介 在之前的文章中,我们讲过了,同步发送单条消息,异步发送单条消息,发送单向消息…...

Appium如何自动判断浏览器驱动
问题:有的测试机chrome是这个版本,有的是另一个版本,怎么能让自动判断去跑呢?? 解决办法:使用appium的chromedriverExecutableDir和chromedriverChromeMappingFile 切忌使用chromedriverExecutableDir和c…...
MVCC-多版本并发控制
MVCC(多版本并发控制)简介 在数据库系统中,并发控制是一个非常重要的话题。为了提高系统的并发性能和吞吐量,现代数据库系统通常使用多种技术来实现对数据的安全访问,其中一种重要的技术就是多版本并发控制࿰…...
c++找最高成绩
根据给定的程序,写成相关的成员函数,完成指定功能。 函数接口定义: 定义max函数,实现输出最高成绩对应的学号以及最高成绩值。 裁判测试程序样例: #include <iostream> using namespace std; class Student{…...
前端saas化部署
在项目中难免会遇到一些特殊的需求,例如同一套代码需要同时部署上两个不同的域名A和B。A和B的不同之处仅在于,例如一些背景图片,logo,展示模块的不同,其他业务逻辑是和展示模块是完全一样的。此时我们当然可以考虑单独…...

[Java基础揉碎]Math类
目录 基本介绍 方法一览(均为静态方法) 1) abs 绝对值 2) pow 求幂 3) ceil 向上取整 4) floor 向下取整 5) round 四舍五入 6) sqrt 求开方 7) random求随机数 8) max 求两个数的最大值 9) min 求两个数的最小值 基本介绍 Math类包含用于执行基本数学运算的方法&…...

MyBatis输入映射
1 parameterType parameterType:接口中方法参数的类型,类型必须是完全限定名或别名(稍后讲别名)。该属性非必须,因为Mybatis框架能自行判断具体传入语句的参数,默认值为未设置(unset)。<sel…...
金三银四,程序员求职季
随着春天的脚步渐近,对于许多程序员来说,一年中最繁忙、最重要的面试季节也随之而来。金三银四,即三月和四月,被广大程序员视为求职的黄金时期。在这两个月里,各大公司纷纷开放招聘,求职者们则通过一轮又一…...

[react优化] 避免组件或数据多次渲染/计算
代码如下 点击视图x➕1,导致视图更新, 视图更细导致a也重新大量计算!!这很浪费时间 function App() {const [x, setX] useState(3)const y x 2console.log(重新渲染, x, y);console.time(timer)let a 0for (let index 0; index < 1000000000; index) {a}console.timeE…...

「意」起出发 丨意大利OXO城市展厅盛大启幕,成都设计圈共襄盛举
4月8日,主题为“「意」起出发「智」见OXO”的意大利OXO城市展厅发布会在成都大悦城OXO成都城市展厅隆重举办。 大会现场,成都装饰协会领导,喜尔康董事长吴锡山,天合智能副董事长罗洁,意大利OXO卫浴市场部总监兰彬&…...
你不知道的JavaScript---深入理解 JavaScript 作用域
你好,我是小白Coding日志,一个热爱技术的程序员。在这里,我分享自己在编程和技术世界中的学习心得和体会。希望我的文章能够给你带来一些灵感和帮助。欢迎来到我的博客,一起在技术的世界里探索前行吧! 1. 什么是作用域…...

FPGA(Verilog)实现按键消抖
实现按键消抖功能: 1.滤除按键按下时的噪声和松开时的噪声信号。 2.获取已消抖的按键按下的标志信号。 3.实现已消抖的按键的连续功能。 Verilog实现 模块端口 key_filter(input wire clk ,input wire rst_n ,input wire key_in , //按下按键时为0output …...

第十二届蓝桥杯大赛软件赛省赛C/C++大学B组
第十二届蓝桥杯大赛软件赛省赛C/C 大学 B 组 文章目录 第十二届蓝桥杯大赛软件赛省赛C/C 大学 B 组1、空间2、卡片3、直线4、货物摆放5、路径6、时间显示7、砝码称重8、杨辉三角形9、双向排序10、括号序列 1、空间 1MB 1024KB 1KB 1024byte 1byte8bit // cout<<"2…...
面了钉钉搜广增算法岗(暑期实习),秒挂。。。。
节前,我们星球组织了一场算法岗技术&面试讨论会,邀请了一些互联网大厂朋友、参加社招和校招面试的同学,针对算法岗技术趋势、大模型落地项目经验分享、新手如何入门算法岗、该如何准备、面试常考点分享等热门话题进行了深入的讨论。 汇总…...
前端实现流文件下载的完整指南
在现代Web开发中,经常会遇到需要从服务器下载文件的情况。有时候这些文件是事先存储好的,可以通过简单的URL链接直接下载;但有时候,我们需要从数据流中动态生成文件并将其提供给用户。本篇博客将介绍如何在前端实现流文件下载的完…...

Kotlin:常用标准库函数(let、run、with、apply、also)
一、let 扩展函数 Kotlin标准库函数let可用于范围确定和空检查。当调用对象时,let执行给定的代码块并返回其最后一个表达式的结果。对象可以通过引用(默认情况下)或自定义名称在块中访问。 let扩展函数源码 let.kt文件代码 fun main() {println("isEmpty $is…...
雷军给年轻人的五点建议
前言 拿来激励自己,没事就看一看,给自己高一点的要求. 致刚入门的程序员五点建议 每个IT企业,尤其是初创企业,非常苦恼:找不到好的程序员。现在大学、软件学院及各种培训机构,每年培养几十万的程序员,毕业的每个人都…...

Unity DOTS物理引擎的核心分析与详解
最近DOTS发布了正式的版本,同时基于DOTS的理念实现了一套高性能的物理引擎,今天我们来给大家分享和介绍一下这个物理引擎的使用。 Unity.Physics的设计哲学 Unity.Physics是基于DOTS设计思想的一个高性能C#物理引擎的实现, 包含了物理刚体的迭代计算与碰撞检测等查…...
C++ //练习 12.4 在我们的check函数中,没有检查i是否大于0。为什么可以忽略这个检查?
C Primer(第5版) 练习 12.4 练习 12.4 在我们的check函数中,没有检查i是否大于0。为什么可以忽略这个检查? 环境:Linux Ubuntu(云服务器) 工具:vim 解释 size_type类型是无符号整…...

达梦备份与恢复
达梦备份与恢复 基础环境 操作系统:Red Hat Enterprise Linux Server release 7.9 (Maipo) 数据库版本:DM Database Server 64 V8 架构:单实例1 设置bak_path路径 --创建备份文件存放目录 su - dmdba mkdir -p /dm8/backup--修改dm.ini 文件…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验
一、多模态商品数据接口的技术架构 (一)多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如,当用户上传一张“蓝色连衣裙”的图片时,接口可自动提取图像中的颜色(RGB值&…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明
AI 领域的快速发展正在催生一个新时代,智能代理(agents)不再是孤立的个体,而是能够像一个数字团队一样协作。然而,当前 AI 生态系统的碎片化阻碍了这一愿景的实现,导致了“AI 巴别塔问题”——不同代理之间…...

【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...

ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”
2025年#高考 将在近日拉开帷幕,#AI 监考一度冲上热搜。当AI深度融入高考,#时间同步 不再是辅助功能,而是决定AI监考系统成败的“生命线”。 AI亮相2025高考,40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕,江西、…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...

C# 表达式和运算符(求值顺序)
求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如,已知表达式3*52,依照子表达式的求值顺序,有两种可能的结果,如图9-3所示。 如果乘法先执行,结果是17。如果5…...