RocketMQ笔记(八)SpringBoot整合RocketMQ广播消费消息
目录
- 一、简介
- 1.1、消费模式
- 二、消费者
- 2.1、maven依赖
- 2.2、application配置
- 2.3、消费监听
- 三、生产者
- 3.1、发送消息
- 3.2、运行结果
- 四、其他
一、简介
在之前的文章中,我们讲过了,同步发送单条消息,异步发送单条消息,发送单向消息,发送顺序消息,批量发送消息,事务消息,我们使用的模式都是 集群消费模式(Cluster),本文就来讲另外一种消息消费模式,也就是广播消费模式(Broadcast)
1.1、消费模式
在 Apache RocketMQ 中,实现消息消费的方式主要是两种:
-
集群消费模式(Cluster):
在集群消费模式下,同一个消费者组(Consumer Group)中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上,但是同一个消息只会被同一个消费者组中的一个消费者消费。 -
广播消费模式(Broadcast):
在广播消费模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,即每个消费者都会独立地消费消息。消息会被广播到同一个消费者组中的所有消费者实例上。
那么怎么使用广播消费模式呢?其实很简单,通过在消费者的 @RocketMQMessageListener 注解中设置 messageModel 参数为 MessageModel.BROADCASTING,即可将消费者设置为广播模式。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,从而实现了消息的广播消费。接下里看看具体操作吧。
二、消费者
2.1、maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq</artifactId><groupId>com.alian</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>11-broadcasting-message-one</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties></project>
2.2、application配置
application.properties
server.port=8011# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的消费者组
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 批量拉取消息的数量
rocketmq.consumer.pull-batch-size=10
# 广播消费模式
rocketmq.consumer.message-model=BROADCASTING
实际上对于本文来说,下面两个配置不用配置,也不会生效。
# 默认的消费者组
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 广播消费模式
rocketmq.consumer.message-model=BROADCASTING
因为优先的是@RocketMQMessageListener 注解中设置 consumerGroup 和messageModel 参数。
2.3、消费监听
@RocketMQMessageListener是RocketMQ提供的注解,用于配置消费者监听器的相关属性。
package com.alian.broadcasting;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = "broadcasting_string_message_topic",consumerGroup = "BROADCASTING_CONSUMER_GROUP",messageModel = MessageModel.BROADCASTING)
public class StringMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("第一个消费者接收到的字符串消息: {}", message);// 处理消息的业务逻辑}
}
关于这里@RocketMQMessageListener的参数做个简单解释:
- topic:必填,指定该消费者订阅的Topic名称
- consumerGroup:必填,指定该消费者所属的消费者组名称,同一个组内的消费者实例通常进行负载均衡消费
- messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
MessageModel.java
public enum MessageModel {BROADCASTING("BROADCASTING"),CLUSTERING("CLUSTERING");private final String modeCN;MessageModel(String modeCN) {this.modeCN = modeCN;}public String getModeCN() {return this.modeCN;}
}
三、生产者
生产者我就复用前面批量消息发送的模块了
3.1、发送消息
@Slf4j
@SpringBootTest
public class SendBatchedBroadcastingMessageTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void syncSendStringMessagesWithBuilder() {String topic = "broadcasting_string_message_topic";for (int i = 0; i < 10; i++) {String message = "广播消息:" + i;Message<String> rocketMessage = MessageBuilder.withPayload(message).build();rocketMQTemplate.convertAndSend(topic, rocketMessage);}}@Testpublic void syncSendBatchStringMessagesWithBuilder() {String topic = "string_message_topic";String message = "批量广播消息:";List<Message<String>> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> rocketMessage = MessageBuilder.withPayload(message + i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSend发送批量消息SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);log.info("批量消息发送结果:{}",sendResult);}@AfterEachpublic void waiting() {try {Thread.sleep(3000L);} catch (InterruptedException e) {e.printStackTrace();}}}
我们先启动消费者,然后生产者发送消息。
3.2、运行结果
运行结果:
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:1
[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:0
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:3
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:2
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:9
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:0
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:2
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:4
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:5
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:3
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:1
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:6
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:9
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:8[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:0
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:1
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:2
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:3
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:9
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:4
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:6
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:2
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:3
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:8
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:1
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:0
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:9
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:5
四、其他
RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者;如果消费者重启或重新加入组,就能从对应Queue的offset处继续消费。
不过使用广播消费模式时,Consumer Group 的概念基本上没有作用,因为每个消费者实例都会独立地收到消息的一个副本。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,而不像集群消费模式中那样,一个消费者组中的消费者会共同消费消息。
广播消费模式在RocketMQ中最好的好处就是消费者解耦:不同的消费者可以独立消费消息,相互之间不受影响,提高了系统的扩展性,它的适用场景有:
- 日志收集 - 需要将日志数据分发给多个日志收集系统,每个系统都需要收到全量日志。
- 数据备份 - 实时备份数据到多个存储系统,确保数据有冗余副本。
- 信息推送 - 向多个推送通道投递并发送消息通知,如站内信、短信、Push等。
- 状态同步 - 将数据变更实时同步到集群的所有节点,保证集群节点状态一致。
- 负载均衡 - 将任务或请求广播给所有服务实例,由每个实例独立处理,实现负载分担。
- 监控告警 - 将系统监控数据广播给多个监控系统,多视角分析。
相关文章:
RocketMQ笔记(八)SpringBoot整合RocketMQ广播消费消息
目录 一、简介1.1、消费模式 二、消费者2.1、maven依赖2.2、application配置2.3、消费监听 三、生产者3.1、发送消息3.2、运行结果 四、其他 一、简介 在之前的文章中,我们讲过了,同步发送单条消息,异步发送单条消息,发送单向消息…...
Appium如何自动判断浏览器驱动
问题:有的测试机chrome是这个版本,有的是另一个版本,怎么能让自动判断去跑呢?? 解决办法:使用appium的chromedriverExecutableDir和chromedriverChromeMappingFile 切忌使用chromedriverExecutableDir和c…...
MVCC-多版本并发控制
MVCC(多版本并发控制)简介 在数据库系统中,并发控制是一个非常重要的话题。为了提高系统的并发性能和吞吐量,现代数据库系统通常使用多种技术来实现对数据的安全访问,其中一种重要的技术就是多版本并发控制࿰…...
c++找最高成绩
根据给定的程序,写成相关的成员函数,完成指定功能。 函数接口定义: 定义max函数,实现输出最高成绩对应的学号以及最高成绩值。 裁判测试程序样例: #include <iostream> using namespace std; class Student{…...
前端saas化部署
在项目中难免会遇到一些特殊的需求,例如同一套代码需要同时部署上两个不同的域名A和B。A和B的不同之处仅在于,例如一些背景图片,logo,展示模块的不同,其他业务逻辑是和展示模块是完全一样的。此时我们当然可以考虑单独…...
[Java基础揉碎]Math类
目录 基本介绍 方法一览(均为静态方法) 1) abs 绝对值 2) pow 求幂 3) ceil 向上取整 4) floor 向下取整 5) round 四舍五入 6) sqrt 求开方 7) random求随机数 8) max 求两个数的最大值 9) min 求两个数的最小值 基本介绍 Math类包含用于执行基本数学运算的方法&…...
MyBatis输入映射
1 parameterType parameterType:接口中方法参数的类型,类型必须是完全限定名或别名(稍后讲别名)。该属性非必须,因为Mybatis框架能自行判断具体传入语句的参数,默认值为未设置(unset)。<sel…...
金三银四,程序员求职季
随着春天的脚步渐近,对于许多程序员来说,一年中最繁忙、最重要的面试季节也随之而来。金三银四,即三月和四月,被广大程序员视为求职的黄金时期。在这两个月里,各大公司纷纷开放招聘,求职者们则通过一轮又一…...
[react优化] 避免组件或数据多次渲染/计算
代码如下 点击视图x➕1,导致视图更新, 视图更细导致a也重新大量计算!!这很浪费时间 function App() {const [x, setX] useState(3)const y x 2console.log(重新渲染, x, y);console.time(timer)let a 0for (let index 0; index < 1000000000; index) {a}console.timeE…...
「意」起出发 丨意大利OXO城市展厅盛大启幕,成都设计圈共襄盛举
4月8日,主题为“「意」起出发「智」见OXO”的意大利OXO城市展厅发布会在成都大悦城OXO成都城市展厅隆重举办。 大会现场,成都装饰协会领导,喜尔康董事长吴锡山,天合智能副董事长罗洁,意大利OXO卫浴市场部总监兰彬&…...
你不知道的JavaScript---深入理解 JavaScript 作用域
你好,我是小白Coding日志,一个热爱技术的程序员。在这里,我分享自己在编程和技术世界中的学习心得和体会。希望我的文章能够给你带来一些灵感和帮助。欢迎来到我的博客,一起在技术的世界里探索前行吧! 1. 什么是作用域…...
FPGA(Verilog)实现按键消抖
实现按键消抖功能: 1.滤除按键按下时的噪声和松开时的噪声信号。 2.获取已消抖的按键按下的标志信号。 3.实现已消抖的按键的连续功能。 Verilog实现 模块端口 key_filter(input wire clk ,input wire rst_n ,input wire key_in , //按下按键时为0output …...
第十二届蓝桥杯大赛软件赛省赛C/C++大学B组
第十二届蓝桥杯大赛软件赛省赛C/C 大学 B 组 文章目录 第十二届蓝桥杯大赛软件赛省赛C/C 大学 B 组1、空间2、卡片3、直线4、货物摆放5、路径6、时间显示7、砝码称重8、杨辉三角形9、双向排序10、括号序列 1、空间 1MB 1024KB 1KB 1024byte 1byte8bit // cout<<"2…...
面了钉钉搜广增算法岗(暑期实习),秒挂。。。。
节前,我们星球组织了一场算法岗技术&面试讨论会,邀请了一些互联网大厂朋友、参加社招和校招面试的同学,针对算法岗技术趋势、大模型落地项目经验分享、新手如何入门算法岗、该如何准备、面试常考点分享等热门话题进行了深入的讨论。 汇总…...
前端实现流文件下载的完整指南
在现代Web开发中,经常会遇到需要从服务器下载文件的情况。有时候这些文件是事先存储好的,可以通过简单的URL链接直接下载;但有时候,我们需要从数据流中动态生成文件并将其提供给用户。本篇博客将介绍如何在前端实现流文件下载的完…...
Kotlin:常用标准库函数(let、run、with、apply、also)
一、let 扩展函数 Kotlin标准库函数let可用于范围确定和空检查。当调用对象时,let执行给定的代码块并返回其最后一个表达式的结果。对象可以通过引用(默认情况下)或自定义名称在块中访问。 let扩展函数源码 let.kt文件代码 fun main() {println("isEmpty $is…...
雷军给年轻人的五点建议
前言 拿来激励自己,没事就看一看,给自己高一点的要求. 致刚入门的程序员五点建议 每个IT企业,尤其是初创企业,非常苦恼:找不到好的程序员。现在大学、软件学院及各种培训机构,每年培养几十万的程序员,毕业的每个人都…...
Unity DOTS物理引擎的核心分析与详解
最近DOTS发布了正式的版本,同时基于DOTS的理念实现了一套高性能的物理引擎,今天我们来给大家分享和介绍一下这个物理引擎的使用。 Unity.Physics的设计哲学 Unity.Physics是基于DOTS设计思想的一个高性能C#物理引擎的实现, 包含了物理刚体的迭代计算与碰撞检测等查…...
C++ //练习 12.4 在我们的check函数中,没有检查i是否大于0。为什么可以忽略这个检查?
C Primer(第5版) 练习 12.4 练习 12.4 在我们的check函数中,没有检查i是否大于0。为什么可以忽略这个检查? 环境:Linux Ubuntu(云服务器) 工具:vim 解释 size_type类型是无符号整…...
达梦备份与恢复
达梦备份与恢复 基础环境 操作系统:Red Hat Enterprise Linux Server release 7.9 (Maipo) 数据库版本:DM Database Server 64 V8 架构:单实例1 设置bak_path路径 --创建备份文件存放目录 su - dmdba mkdir -p /dm8/backup--修改dm.ini 文件…...
如何快速批量下载知网文献?CNKI-download自动化工具终极指南
如何快速批量下载知网文献?CNKI-download自动化工具终极指南 【免费下载链接】CNKI-download :frog: 知网(CNKI)文献下载及文献速览爬虫 项目地址: https://gitcode.com/gh_mirrors/cn/CNKI-download 对于学术研究者和学生来说,从知网࿰…...
基于 LlamaFactory 与 LoRA 微调开源大模型:构建高效文本分类系统的实践指南
1. 为什么选择LlamaFactoryLoRA做文本分类? 最近在做一个政务工单分类项目时,我发现传统BERT模型遇到三个头疼问题:标注成本高(需要上万条数据)、领域迁移难(换个场景就失效)、小样本表现差&…...
FPGA实战:手把手教你用Verilog给NAND Flash数据上把“安全锁”(附完整ECC代码)
FPGA实战:用Verilog为NAND Flash打造硬件级ECC防护系统 1. 为什么你的NAND Flash需要硬件ECC? NAND Flash存储芯片在工业控制、物联网终端和边缘计算设备中扮演着关键角色,但它的物理特性导致数据可靠性存在先天缺陷。想象一下,当…...
Java微服务在Istio中出现“偶发503 no healthy upstream”?7分钟定位Sidecar健康检查盲区与Liveness Probe冲突真相
第一章:Java微服务在Istio中偶发503问题的现象与影响在基于Istio构建的服务网格环境中,Java微服务(尤其是采用Spring Cloud Kubernetes或原生Spring Boot Istio Sidecar部署模式)频繁出现偶发性HTTP 503 Service Unavailable响应…...
Lychee-rerank-mm在音乐推荐中的创新应用
Lychee-rerank-mm在音乐推荐中的创新应用 1. 引言 你有没有遇到过这样的情况:在音乐平台上听到一首很喜欢的歌,想找类似的音乐,但系统推荐的歌曲却总是差强人意?要么封面风格完全不搭,要么歌词主题南辕北辙ÿ…...
别再用asyncio硬扛高并发了!无GIL环境下Python原生多线程性能翻倍的6个核心调优参数
第一章:Python无锁GIL环境下的并发模型演进全景Python长期以来受全局解释器锁(GIL)制约,导致多线程无法真正并行执行CPU密集型任务。近年来,随着CPython 3.12正式引入实验性“无GIL构建选项”(--without-py…...
P1095 守望者的逃离【洛谷算法习题】
P1095 守望者的逃离 网页链接 P1095 守望者的逃离 题目背景 NOIP2007 普及组 T3 题目描述 恶魔猎手尤迪安野心勃勃,他背叛了暗夜精灵,率领深藏在海底的娜迦族企图叛变。 守望者在与尤迪安的交锋中遭遇了围杀,被困在一个荒芜的大岛上。…...
智能车调参手记:我用Kp=200, Ki=60, Kd=40让小车稳如老狗
智能车调参手记:我用Kp200, Ki60, Kd40让小车稳如老狗 凌晨三点的实验室里,咖啡杯已经见底,眼前的智能车在测试跑道上又一次冲出了弯道。这已经是本周第七次熬夜调试,上坡时的速度波动问题始终困扰着我们。就在准备放弃的时候&…...
告别信息混乱:Trilium中文版让知识管理像整理衣柜一样简单
告别信息混乱:Trilium中文版让知识管理像整理衣柜一样简单 【免费下载链接】trilium-translation Translation for Trilium Notes. Trilium Notes 中文适配, 体验优化 项目地址: https://gitcode.com/gh_mirrors/tr/trilium-translation 还在为英文笔记软件的…...
解决Gradio share=True报错:手动下载并配置frpc_linux_amd64_v0.3文件的保姆级教程
解决Gradio shareTrue报错的完整实战指南:从手动配置frpc到深度优化 当你兴奋地准备向客户展示刚完成的Gradio应用时,却在终端看到红色的报错信息——shareTrue参数失效了。这种场景对开发者来说再熟悉不过:本地调试一切正常,但需…...
