RocketMQ笔记(八)SpringBoot整合RocketMQ广播消费消息
目录
- 一、简介
- 1.1、消费模式
- 二、消费者
- 2.1、maven依赖
- 2.2、application配置
- 2.3、消费监听
- 三、生产者
- 3.1、发送消息
- 3.2、运行结果
- 四、其他
一、简介
在之前的文章中,我们讲过了,同步发送单条消息,异步发送单条消息,发送单向消息,发送顺序消息,批量发送消息,事务消息,我们使用的模式都是 集群消费模式(Cluster),本文就来讲另外一种消息消费模式,也就是广播消费模式(Broadcast)
1.1、消费模式
在 Apache RocketMQ 中,实现消息消费的方式主要是两种:
-
集群消费模式(Cluster):
在集群消费模式下,同一个消费者组(Consumer Group)中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上,但是同一个消息只会被同一个消费者组中的一个消费者消费。 -
广播消费模式(Broadcast):
在广播消费模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,即每个消费者都会独立地消费消息。消息会被广播到同一个消费者组中的所有消费者实例上。
那么怎么使用广播消费模式呢?其实很简单,通过在消费者的 @RocketMQMessageListener 注解中设置 messageModel 参数为 MessageModel.BROADCASTING,即可将消费者设置为广播模式。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,从而实现了消息的广播消费。接下里看看具体操作吧。
二、消费者
2.1、maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq</artifactId><groupId>com.alian</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>11-broadcasting-message-one</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties></project>
2.2、application配置
application.properties
server.port=8011# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的消费者组
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 批量拉取消息的数量
rocketmq.consumer.pull-batch-size=10
# 广播消费模式
rocketmq.consumer.message-model=BROADCASTING
实际上对于本文来说,下面两个配置不用配置,也不会生效。
# 默认的消费者组
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 广播消费模式
rocketmq.consumer.message-model=BROADCASTING
因为优先的是@RocketMQMessageListener 注解中设置 consumerGroup 和messageModel 参数。
2.3、消费监听
@RocketMQMessageListener是RocketMQ提供的注解,用于配置消费者监听器的相关属性。
package com.alian.broadcasting;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = "broadcasting_string_message_topic",consumerGroup = "BROADCASTING_CONSUMER_GROUP",messageModel = MessageModel.BROADCASTING)
public class StringMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("第一个消费者接收到的字符串消息: {}", message);// 处理消息的业务逻辑}
}
关于这里@RocketMQMessageListener的参数做个简单解释:
- topic:必填,指定该消费者订阅的Topic名称
- consumerGroup:必填,指定该消费者所属的消费者组名称,同一个组内的消费者实例通常进行负载均衡消费
- messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
MessageModel.java
public enum MessageModel {BROADCASTING("BROADCASTING"),CLUSTERING("CLUSTERING");private final String modeCN;MessageModel(String modeCN) {this.modeCN = modeCN;}public String getModeCN() {return this.modeCN;}
}
三、生产者
生产者我就复用前面批量消息发送的模块了
3.1、发送消息
@Slf4j
@SpringBootTest
public class SendBatchedBroadcastingMessageTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void syncSendStringMessagesWithBuilder() {String topic = "broadcasting_string_message_topic";for (int i = 0; i < 10; i++) {String message = "广播消息:" + i;Message<String> rocketMessage = MessageBuilder.withPayload(message).build();rocketMQTemplate.convertAndSend(topic, rocketMessage);}}@Testpublic void syncSendBatchStringMessagesWithBuilder() {String topic = "string_message_topic";String message = "批量广播消息:";List<Message<String>> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> rocketMessage = MessageBuilder.withPayload(message + i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSend发送批量消息SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);log.info("批量消息发送结果:{}",sendResult);}@AfterEachpublic void waiting() {try {Thread.sleep(3000L);} catch (InterruptedException e) {e.printStackTrace();}}}
我们先启动消费者,然后生产者发送消息。
3.2、运行结果
运行结果:
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:1
[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:0
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:3
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:2
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:9
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:0
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:2
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:4
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:5
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:3
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:1
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:6
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:9
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:8[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:0
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:1
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:2
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:3
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:9
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:4
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:6
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:2
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:3
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:8
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:1
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:0
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:9
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:5
四、其他
RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者;如果消费者重启或重新加入组,就能从对应Queue的offset处继续消费。
不过使用广播消费模式时,Consumer Group 的概念基本上没有作用,因为每个消费者实例都会独立地收到消息的一个副本。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,而不像集群消费模式中那样,一个消费者组中的消费者会共同消费消息。
广播消费模式在RocketMQ中最好的好处就是消费者解耦:不同的消费者可以独立消费消息,相互之间不受影响,提高了系统的扩展性,它的适用场景有:
- 日志收集 - 需要将日志数据分发给多个日志收集系统,每个系统都需要收到全量日志。
- 数据备份 - 实时备份数据到多个存储系统,确保数据有冗余副本。
- 信息推送 - 向多个推送通道投递并发送消息通知,如站内信、短信、Push等。
- 状态同步 - 将数据变更实时同步到集群的所有节点,保证集群节点状态一致。
- 负载均衡 - 将任务或请求广播给所有服务实例,由每个实例独立处理,实现负载分担。
- 监控告警 - 将系统监控数据广播给多个监控系统,多视角分析。
相关文章:

RocketMQ笔记(八)SpringBoot整合RocketMQ广播消费消息
目录 一、简介1.1、消费模式 二、消费者2.1、maven依赖2.2、application配置2.3、消费监听 三、生产者3.1、发送消息3.2、运行结果 四、其他 一、简介 在之前的文章中,我们讲过了,同步发送单条消息,异步发送单条消息,发送单向消息…...

Appium如何自动判断浏览器驱动
问题:有的测试机chrome是这个版本,有的是另一个版本,怎么能让自动判断去跑呢?? 解决办法:使用appium的chromedriverExecutableDir和chromedriverChromeMappingFile 切忌使用chromedriverExecutableDir和c…...

MVCC-多版本并发控制
MVCC(多版本并发控制)简介 在数据库系统中,并发控制是一个非常重要的话题。为了提高系统的并发性能和吞吐量,现代数据库系统通常使用多种技术来实现对数据的安全访问,其中一种重要的技术就是多版本并发控制࿰…...

c++找最高成绩
根据给定的程序,写成相关的成员函数,完成指定功能。 函数接口定义: 定义max函数,实现输出最高成绩对应的学号以及最高成绩值。 裁判测试程序样例: #include <iostream> using namespace std; class Student{…...

前端saas化部署
在项目中难免会遇到一些特殊的需求,例如同一套代码需要同时部署上两个不同的域名A和B。A和B的不同之处仅在于,例如一些背景图片,logo,展示模块的不同,其他业务逻辑是和展示模块是完全一样的。此时我们当然可以考虑单独…...

[Java基础揉碎]Math类
目录 基本介绍 方法一览(均为静态方法) 1) abs 绝对值 2) pow 求幂 3) ceil 向上取整 4) floor 向下取整 5) round 四舍五入 6) sqrt 求开方 7) random求随机数 8) max 求两个数的最大值 9) min 求两个数的最小值 基本介绍 Math类包含用于执行基本数学运算的方法&…...

MyBatis输入映射
1 parameterType parameterType:接口中方法参数的类型,类型必须是完全限定名或别名(稍后讲别名)。该属性非必须,因为Mybatis框架能自行判断具体传入语句的参数,默认值为未设置(unset)。<sel…...

金三银四,程序员求职季
随着春天的脚步渐近,对于许多程序员来说,一年中最繁忙、最重要的面试季节也随之而来。金三银四,即三月和四月,被广大程序员视为求职的黄金时期。在这两个月里,各大公司纷纷开放招聘,求职者们则通过一轮又一…...

[react优化] 避免组件或数据多次渲染/计算
代码如下 点击视图x➕1,导致视图更新, 视图更细导致a也重新大量计算!!这很浪费时间 function App() {const [x, setX] useState(3)const y x 2console.log(重新渲染, x, y);console.time(timer)let a 0for (let index 0; index < 1000000000; index) {a}console.timeE…...

「意」起出发 丨意大利OXO城市展厅盛大启幕,成都设计圈共襄盛举
4月8日,主题为“「意」起出发「智」见OXO”的意大利OXO城市展厅发布会在成都大悦城OXO成都城市展厅隆重举办。 大会现场,成都装饰协会领导,喜尔康董事长吴锡山,天合智能副董事长罗洁,意大利OXO卫浴市场部总监兰彬&…...

你不知道的JavaScript---深入理解 JavaScript 作用域
你好,我是小白Coding日志,一个热爱技术的程序员。在这里,我分享自己在编程和技术世界中的学习心得和体会。希望我的文章能够给你带来一些灵感和帮助。欢迎来到我的博客,一起在技术的世界里探索前行吧! 1. 什么是作用域…...

FPGA(Verilog)实现按键消抖
实现按键消抖功能: 1.滤除按键按下时的噪声和松开时的噪声信号。 2.获取已消抖的按键按下的标志信号。 3.实现已消抖的按键的连续功能。 Verilog实现 模块端口 key_filter(input wire clk ,input wire rst_n ,input wire key_in , //按下按键时为0output …...

第十二届蓝桥杯大赛软件赛省赛C/C++大学B组
第十二届蓝桥杯大赛软件赛省赛C/C 大学 B 组 文章目录 第十二届蓝桥杯大赛软件赛省赛C/C 大学 B 组1、空间2、卡片3、直线4、货物摆放5、路径6、时间显示7、砝码称重8、杨辉三角形9、双向排序10、括号序列 1、空间 1MB 1024KB 1KB 1024byte 1byte8bit // cout<<"2…...

面了钉钉搜广增算法岗(暑期实习),秒挂。。。。
节前,我们星球组织了一场算法岗技术&面试讨论会,邀请了一些互联网大厂朋友、参加社招和校招面试的同学,针对算法岗技术趋势、大模型落地项目经验分享、新手如何入门算法岗、该如何准备、面试常考点分享等热门话题进行了深入的讨论。 汇总…...

前端实现流文件下载的完整指南
在现代Web开发中,经常会遇到需要从服务器下载文件的情况。有时候这些文件是事先存储好的,可以通过简单的URL链接直接下载;但有时候,我们需要从数据流中动态生成文件并将其提供给用户。本篇博客将介绍如何在前端实现流文件下载的完…...

Kotlin:常用标准库函数(let、run、with、apply、also)
一、let 扩展函数 Kotlin标准库函数let可用于范围确定和空检查。当调用对象时,let执行给定的代码块并返回其最后一个表达式的结果。对象可以通过引用(默认情况下)或自定义名称在块中访问。 let扩展函数源码 let.kt文件代码 fun main() {println("isEmpty $is…...

雷军给年轻人的五点建议
前言 拿来激励自己,没事就看一看,给自己高一点的要求. 致刚入门的程序员五点建议 每个IT企业,尤其是初创企业,非常苦恼:找不到好的程序员。现在大学、软件学院及各种培训机构,每年培养几十万的程序员,毕业的每个人都…...

Unity DOTS物理引擎的核心分析与详解
最近DOTS发布了正式的版本,同时基于DOTS的理念实现了一套高性能的物理引擎,今天我们来给大家分享和介绍一下这个物理引擎的使用。 Unity.Physics的设计哲学 Unity.Physics是基于DOTS设计思想的一个高性能C#物理引擎的实现, 包含了物理刚体的迭代计算与碰撞检测等查…...

C++ //练习 12.4 在我们的check函数中,没有检查i是否大于0。为什么可以忽略这个检查?
C Primer(第5版) 练习 12.4 练习 12.4 在我们的check函数中,没有检查i是否大于0。为什么可以忽略这个检查? 环境:Linux Ubuntu(云服务器) 工具:vim 解释 size_type类型是无符号整…...

达梦备份与恢复
达梦备份与恢复 基础环境 操作系统:Red Hat Enterprise Linux Server release 7.9 (Maipo) 数据库版本:DM Database Server 64 V8 架构:单实例1 设置bak_path路径 --创建备份文件存放目录 su - dmdba mkdir -p /dm8/backup--修改dm.ini 文件…...

iOS App Store审核要求与Flutter应用的兼容性分析
本文探讨了使用Flutter开发的iOS应用能否上架,以及上架的具体流程。苹果提供了App Store作为正式上架渠道,同时也有TestFlight供开发者进行内测。合规并通过审核后,Flutter应用可以顺利上架。但上架过程可能存在一些挑战,因此可能…...

javaScript常见对象方法总结
1,object.assign() 用于合并对象的属性。它可以将一个或多个源对象的属性复制到目标对象中,实现属性的合并。 语法 Object.assign(target, ...sources); 1,target:目标对象,将属性复制到该对象中。 2,sources:一个…...

使用Java流API构建树形结构数据
简介: 在实际开发中,构建树状层次结构是常见需求,如组织架构、目录结构或菜单系统。本教案通过解析给定的Java代码,展示如何使用Java 8 Stream API将扁平化的菜单数据转换为具有层级关系的树形结构。 1. 核心类定义 - Menu Data…...

蓝桥杯备考
1.1 输入输出 cin/cout scanf/printf 万能头文件 #include<bits/stdc.h> cin/cout 速度相对慢,需要关同步,代码如下 #include<bits/stdc.h> using namespace std; int main(){ios::sync_with_stdio(0);cin.tie(0);cout.tie(0);int x,y;cin>>x…...

Linux云计算之Linux基础1——操作系统理论基础
目录 1、UNIX 的诞生和广泛使用 2、CPU 架构类型 3、CPU 指令 4、计算机程序设计和执行过程 5、操作统OS 6、编程层次 7、程序的内部运行接口 8、UI程序接口(人机交互接口) 9、程序的运行模式: 10、POSIX:可移植操作系统规范 11、计算机开源领域 12、Li…...

大模型从入门到应用——OpenAI基础调用
摘要:这是OpenAI的基本调用,通过文章了解大模型的一个基础使用 1. 调用说明 在大型语言模型(LLM)的应用中,OpenAI的基础调用是入门的关键一步。通过调用OpenAI的API,我们可以利用其强大的语言处理能力&am…...

前端学习<三>CSS进阶——0102-CSS布局样式
前言 css 进阶的主要内容如下。 1、css 非布局样式 html 元素的分类和特性 css 选择器 css 常见属性(非布局样式) 2、css 布局相关 css 布局属性和组合解析 常见布局方案 三栏布局案例 3、动画和效果 属于 css 中最出彩的内容。 多背景多投影特…...

关于51单片机TMOD定时器的安全配置
定时器介绍: -------------------------------------------------------------------------------------------------------------------------- 首先配置的是控制寄存器 TCON 说直白点,这个寄存器就是用来计数的,打开计时器,关…...

Unity 主线程和其他线程之间的数据访问
在Unity中,主线程和其他线程之间的数据访问需要小心处理,因为在多线程环境下,不当的数据访问可能导致竞争条件和数据不一致性。 在Unity中,主线程通常用于处理用户输入、更新游戏逻辑和渲染。其他线程通常用于执行耗时的计算、加…...

电商系列之风控安全
> 插:AI时代,程序员或多或少要了解些人工智能,前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 坚持不懈,越努力越幸运,大家…...