RocketMQ笔记(八)SpringBoot整合RocketMQ广播消费消息
目录
- 一、简介
- 1.1、消费模式
- 二、消费者
- 2.1、maven依赖
- 2.2、application配置
- 2.3、消费监听
- 三、生产者
- 3.1、发送消息
- 3.2、运行结果
- 四、其他
一、简介
在之前的文章中,我们讲过了,同步发送单条消息,异步发送单条消息,发送单向消息,发送顺序消息,批量发送消息,事务消息,我们使用的模式都是 集群消费模式(Cluster),本文就来讲另外一种消息消费模式,也就是广播消费模式(Broadcast)
1.1、消费模式
在 Apache RocketMQ 中,实现消息消费的方式主要是两种:
-
集群消费模式(Cluster):
在集群消费模式下,同一个消费者组(Consumer Group)中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上,但是同一个消息只会被同一个消费者组中的一个消费者消费。 -
广播消费模式(Broadcast):
在广播消费模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,即每个消费者都会独立地消费消息。消息会被广播到同一个消费者组中的所有消费者实例上。
那么怎么使用广播消费模式呢?其实很简单,通过在消费者的 @RocketMQMessageListener 注解中设置 messageModel 参数为 MessageModel.BROADCASTING,即可将消费者设置为广播模式。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,从而实现了消息的广播消费。接下里看看具体操作吧。
二、消费者
2.1、maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq</artifactId><groupId>com.alian</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>11-broadcasting-message-one</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties></project>
2.2、application配置
application.properties
server.port=8011# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的消费者组
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 批量拉取消息的数量
rocketmq.consumer.pull-batch-size=10
# 广播消费模式
rocketmq.consumer.message-model=BROADCASTING
实际上对于本文来说,下面两个配置不用配置,也不会生效。
# 默认的消费者组
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 广播消费模式
rocketmq.consumer.message-model=BROADCASTING
因为优先的是@RocketMQMessageListener 注解中设置 consumerGroup 和messageModel 参数。
2.3、消费监听
@RocketMQMessageListener是RocketMQ提供的注解,用于配置消费者监听器的相关属性。
package com.alian.broadcasting;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = "broadcasting_string_message_topic",consumerGroup = "BROADCASTING_CONSUMER_GROUP",messageModel = MessageModel.BROADCASTING)
public class StringMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("第一个消费者接收到的字符串消息: {}", message);// 处理消息的业务逻辑}
}
关于这里@RocketMQMessageListener的参数做个简单解释:
- topic:必填,指定该消费者订阅的Topic名称
- consumerGroup:必填,指定该消费者所属的消费者组名称,同一个组内的消费者实例通常进行负载均衡消费
- messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
MessageModel.java
public enum MessageModel {BROADCASTING("BROADCASTING"),CLUSTERING("CLUSTERING");private final String modeCN;MessageModel(String modeCN) {this.modeCN = modeCN;}public String getModeCN() {return this.modeCN;}
}
三、生产者
生产者我就复用前面批量消息发送的模块了
3.1、发送消息
@Slf4j
@SpringBootTest
public class SendBatchedBroadcastingMessageTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void syncSendStringMessagesWithBuilder() {String topic = "broadcasting_string_message_topic";for (int i = 0; i < 10; i++) {String message = "广播消息:" + i;Message<String> rocketMessage = MessageBuilder.withPayload(message).build();rocketMQTemplate.convertAndSend(topic, rocketMessage);}}@Testpublic void syncSendBatchStringMessagesWithBuilder() {String topic = "string_message_topic";String message = "批量广播消息:";List<Message<String>> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> rocketMessage = MessageBuilder.withPayload(message + i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSend发送批量消息SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);log.info("批量消息发送结果:{}",sendResult);}@AfterEachpublic void waiting() {try {Thread.sleep(3000L);} catch (InterruptedException e) {e.printStackTrace();}}}
我们先启动消费者,然后生产者发送消息。
3.2、运行结果
运行结果:
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:1
[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:0
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:3
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:2
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息:9
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:0
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:2
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:4
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:5
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:3
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:1
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:6
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:9
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息:8[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:0
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:1
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:2
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:3
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息:9
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:4
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:6
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:2
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:3
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:8
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:1
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:0
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:9
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息:5
四、其他
RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者;如果消费者重启或重新加入组,就能从对应Queue的offset处继续消费。
不过使用广播消费模式时,Consumer Group 的概念基本上没有作用,因为每个消费者实例都会独立地收到消息的一个副本。在广播模式下,同一个消费者组中的每个消费者都会收到消息的一个副本,每个消费者都会独立地消费消息,而不像集群消费模式中那样,一个消费者组中的消费者会共同消费消息。
广播消费模式在RocketMQ中最好的好处就是消费者解耦:不同的消费者可以独立消费消息,相互之间不受影响,提高了系统的扩展性,它的适用场景有:
- 日志收集 - 需要将日志数据分发给多个日志收集系统,每个系统都需要收到全量日志。
- 数据备份 - 实时备份数据到多个存储系统,确保数据有冗余副本。
- 信息推送 - 向多个推送通道投递并发送消息通知,如站内信、短信、Push等。
- 状态同步 - 将数据变更实时同步到集群的所有节点,保证集群节点状态一致。
- 负载均衡 - 将任务或请求广播给所有服务实例,由每个实例独立处理,实现负载分担。
- 监控告警 - 将系统监控数据广播给多个监控系统,多视角分析。
相关文章:
RocketMQ笔记(八)SpringBoot整合RocketMQ广播消费消息
目录 一、简介1.1、消费模式 二、消费者2.1、maven依赖2.2、application配置2.3、消费监听 三、生产者3.1、发送消息3.2、运行结果 四、其他 一、简介 在之前的文章中,我们讲过了,同步发送单条消息,异步发送单条消息,发送单向消息…...
Appium如何自动判断浏览器驱动
问题:有的测试机chrome是这个版本,有的是另一个版本,怎么能让自动判断去跑呢?? 解决办法:使用appium的chromedriverExecutableDir和chromedriverChromeMappingFile 切忌使用chromedriverExecutableDir和c…...
MVCC-多版本并发控制
MVCC(多版本并发控制)简介 在数据库系统中,并发控制是一个非常重要的话题。为了提高系统的并发性能和吞吐量,现代数据库系统通常使用多种技术来实现对数据的安全访问,其中一种重要的技术就是多版本并发控制࿰…...
c++找最高成绩
根据给定的程序,写成相关的成员函数,完成指定功能。 函数接口定义: 定义max函数,实现输出最高成绩对应的学号以及最高成绩值。 裁判测试程序样例: #include <iostream> using namespace std; class Student{…...
前端saas化部署
在项目中难免会遇到一些特殊的需求,例如同一套代码需要同时部署上两个不同的域名A和B。A和B的不同之处仅在于,例如一些背景图片,logo,展示模块的不同,其他业务逻辑是和展示模块是完全一样的。此时我们当然可以考虑单独…...
[Java基础揉碎]Math类
目录 基本介绍 方法一览(均为静态方法) 1) abs 绝对值 2) pow 求幂 3) ceil 向上取整 4) floor 向下取整 5) round 四舍五入 6) sqrt 求开方 7) random求随机数 8) max 求两个数的最大值 9) min 求两个数的最小值 基本介绍 Math类包含用于执行基本数学运算的方法&…...
MyBatis输入映射
1 parameterType parameterType:接口中方法参数的类型,类型必须是完全限定名或别名(稍后讲别名)。该属性非必须,因为Mybatis框架能自行判断具体传入语句的参数,默认值为未设置(unset)。<sel…...
金三银四,程序员求职季
随着春天的脚步渐近,对于许多程序员来说,一年中最繁忙、最重要的面试季节也随之而来。金三银四,即三月和四月,被广大程序员视为求职的黄金时期。在这两个月里,各大公司纷纷开放招聘,求职者们则通过一轮又一…...
[react优化] 避免组件或数据多次渲染/计算
代码如下 点击视图x➕1,导致视图更新, 视图更细导致a也重新大量计算!!这很浪费时间 function App() {const [x, setX] useState(3)const y x 2console.log(重新渲染, x, y);console.time(timer)let a 0for (let index 0; index < 1000000000; index) {a}console.timeE…...
「意」起出发 丨意大利OXO城市展厅盛大启幕,成都设计圈共襄盛举
4月8日,主题为“「意」起出发「智」见OXO”的意大利OXO城市展厅发布会在成都大悦城OXO成都城市展厅隆重举办。 大会现场,成都装饰协会领导,喜尔康董事长吴锡山,天合智能副董事长罗洁,意大利OXO卫浴市场部总监兰彬&…...
你不知道的JavaScript---深入理解 JavaScript 作用域
你好,我是小白Coding日志,一个热爱技术的程序员。在这里,我分享自己在编程和技术世界中的学习心得和体会。希望我的文章能够给你带来一些灵感和帮助。欢迎来到我的博客,一起在技术的世界里探索前行吧! 1. 什么是作用域…...
FPGA(Verilog)实现按键消抖
实现按键消抖功能: 1.滤除按键按下时的噪声和松开时的噪声信号。 2.获取已消抖的按键按下的标志信号。 3.实现已消抖的按键的连续功能。 Verilog实现 模块端口 key_filter(input wire clk ,input wire rst_n ,input wire key_in , //按下按键时为0output …...
第十二届蓝桥杯大赛软件赛省赛C/C++大学B组
第十二届蓝桥杯大赛软件赛省赛C/C 大学 B 组 文章目录 第十二届蓝桥杯大赛软件赛省赛C/C 大学 B 组1、空间2、卡片3、直线4、货物摆放5、路径6、时间显示7、砝码称重8、杨辉三角形9、双向排序10、括号序列 1、空间 1MB 1024KB 1KB 1024byte 1byte8bit // cout<<"2…...
面了钉钉搜广增算法岗(暑期实习),秒挂。。。。
节前,我们星球组织了一场算法岗技术&面试讨论会,邀请了一些互联网大厂朋友、参加社招和校招面试的同学,针对算法岗技术趋势、大模型落地项目经验分享、新手如何入门算法岗、该如何准备、面试常考点分享等热门话题进行了深入的讨论。 汇总…...
前端实现流文件下载的完整指南
在现代Web开发中,经常会遇到需要从服务器下载文件的情况。有时候这些文件是事先存储好的,可以通过简单的URL链接直接下载;但有时候,我们需要从数据流中动态生成文件并将其提供给用户。本篇博客将介绍如何在前端实现流文件下载的完…...
Kotlin:常用标准库函数(let、run、with、apply、also)
一、let 扩展函数 Kotlin标准库函数let可用于范围确定和空检查。当调用对象时,let执行给定的代码块并返回其最后一个表达式的结果。对象可以通过引用(默认情况下)或自定义名称在块中访问。 let扩展函数源码 let.kt文件代码 fun main() {println("isEmpty $is…...
雷军给年轻人的五点建议
前言 拿来激励自己,没事就看一看,给自己高一点的要求. 致刚入门的程序员五点建议 每个IT企业,尤其是初创企业,非常苦恼:找不到好的程序员。现在大学、软件学院及各种培训机构,每年培养几十万的程序员,毕业的每个人都…...
Unity DOTS物理引擎的核心分析与详解
最近DOTS发布了正式的版本,同时基于DOTS的理念实现了一套高性能的物理引擎,今天我们来给大家分享和介绍一下这个物理引擎的使用。 Unity.Physics的设计哲学 Unity.Physics是基于DOTS设计思想的一个高性能C#物理引擎的实现, 包含了物理刚体的迭代计算与碰撞检测等查…...
C++ //练习 12.4 在我们的check函数中,没有检查i是否大于0。为什么可以忽略这个检查?
C Primer(第5版) 练习 12.4 练习 12.4 在我们的check函数中,没有检查i是否大于0。为什么可以忽略这个检查? 环境:Linux Ubuntu(云服务器) 工具:vim 解释 size_type类型是无符号整…...
达梦备份与恢复
达梦备份与恢复 基础环境 操作系统:Red Hat Enterprise Linux Server release 7.9 (Maipo) 数据库版本:DM Database Server 64 V8 架构:单实例1 设置bak_path路径 --创建备份文件存放目录 su - dmdba mkdir -p /dm8/backup--修改dm.ini 文件…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...
多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...
Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...
VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP
编辑-虚拟网络编辑器-更改设置 选择桥接模式,然后找到相应的网卡(可以查看自己本机的网络连接) windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置,选择刚才配置的桥接模式 静态ip设置: 我用的ubuntu24桌…...
【Go语言基础【13】】函数、闭包、方法
文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数(函数作为参数、返回值) 三、匿名函数与闭包1. 匿名函数(Lambda函…...
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...
逻辑回归暴力训练预测金融欺诈
简述 「使用逻辑回归暴力预测金融欺诈,并不断增加特征维度持续测试」的做法,体现了一种逐步建模与迭代验证的实验思路,在金融欺诈检测中非常有价值,本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...
解决:Android studio 编译后报错\app\src\main\cpp\CMakeLists.txt‘ to exist
现象: android studio报错: [CXX1409] D:\GitLab\xxxxx\app.cxx\Debug\3f3w4y1i\arm64-v8a\android_gradle_build.json : expected buildFiles file ‘D:\GitLab\xxxxx\app\src\main\cpp\CMakeLists.txt’ to exist 解决: 不要动CMakeLists.…...
