Springboot 集成 RocketMQ(进阶-消息)
0. 入门篇
Springboot 集成 RocketMq(入门)-CSDN博客
1. 异步消息
1.1 生产者
@GetMapping("/send/async/{messageBody}")public String sendAsyncMsg(@PathVariable("messageBody") String messageBody) {// 构建消息对象Message message = new Message();message.setTopic("async");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody(messageBody.getBytes());rocketMqTemplate.asyncSend("async", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("sendAsyncMsg success messageBody:{}", messageBody);}@Overridepublic void onException(Throwable throwable) {log.error("sendAsyncMsg fail messageBody:{}", messageBody);}});return "OK";}
1.2 消费者
@Component
@RocketMQMessageListener(topic = "async", consumerGroup = "async")
@Slf4j
public class MyAsyncConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑log.info("Received async message: {}", message);}}
2.广播消息
2.1 生产者
@GetMapping("/send/broadcast/{messageBody}")public String sendBroadcastMsg(@PathVariable("messageBody") String messageBody) {// 发送消息rocketMqTemplate.convertAndSend("broadcast", messageBody);return "OK";}
2.2 消费者
消费者监听注解上设置 messageModel = MessageModel.BROADCASTING,默认是(MessageModel.CLUSTERING)。
@Component
@RocketMQMessageListener(topic = "broadcast", consumerGroup = "broadcast1", messageModel = MessageModel.BROADCASTING )
@Slf4j
public class MyBroadcast1Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑log.info("MyBroadcast1Consumer Received broadcast message: {}", message);}}@Component
@RocketMQMessageListener(topic = "broadcast", consumerGroup = "broadcast2", messageModel = MessageModel.BROADCASTING )
@Slf4j
public class MyBroadcast2Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑log.info("MyBroadcast2Consumer Received broadcast message: {}", message);}}
3. 延时、定时消息
3.1 延时消息
3.1.1 生产者
message.setDelayTimeMs(10);
@GetMapping("/send/delayed/{messageBody}")public String sendDelayedMsg(@PathVariable("messageBody") String messageBody) {// 发送消息Message message = new Message();message.setTopic("delayed");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody(messageBody.getBytes(StandardCharsets.UTF_8));// 延迟 10s 后投递message.setDelayTimeMs(10);// 发送消息,打印日志SendResult sendResult = null;try {sendResult = defaultMqProducer.send(message);log.info("sendDelayedMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {log.info("sendDelayedMsg error", e);return "FAIL";}return "OK";}
3.1.2 消费者
@Component
@RocketMQMessageListener(topic = "delayed", consumerGroup = "delayed")
@Slf4j
public class MyDelayedConsumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {// 处理消息的逻辑log.info("Received delayed message: {}", message);}}
3.2 定时消息
3.2.1 生产者
message.setDeliverTimeMs(System.currentTimeMillis() + 10);
@GetMapping("/send/scheduled/{messageBody}")public String sendScheduledMsg(@PathVariable("messageBody") String messageBody) {// 发送消息Message message = new Message();message.setTopic("scheduled");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody(messageBody.getBytes(StandardCharsets.UTF_8));// 定时 10s 后投递message.setDeliverTimeMs(System.currentTimeMillis() + 10);// 发送消息,打印日志SendResult sendResult = null;try {sendResult = defaultMqProducer.send(message);log.info("sendScheduledMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {log.info("sendScheduledMsg error", e);return "FAIL";}return "OK";}
3.2.2 消费者
@Component
@RocketMQMessageListener(topic = "scheduled", consumerGroup = "scheduled")
@Slf4j
public class MyScheduledConsumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {// 处理消息的逻辑log.info("Received scheduled message: {}", message);}}
4.批量消息
4.1 生产者
@GetMapping("/send/batch/{messageBody}")public String sendBatchMsg(@PathVariable("messageBody") String messageBody) {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message message = new Message();message.setTopic("batch");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody((messageBody + i).getBytes(StandardCharsets.UTF_8));}// 发送消息,打印日志SendResult sendResult = null;try {sendResult = defaultMqProducer.send(messageList);log.info("sendBatchMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {log.info("sendBatchMsg error", e);return "FAIL";}return "OK";}
4.2 消费者
@Component
@RocketMQMessageListener(topic = "batch", consumerGroup = "batch")
@Slf4j
public class MyBatchConsumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {// 处理消息的逻辑log.info("Received batch message: {}", message);}}
5.顺序消息
5.1 局部有序
局部消息指的是消费者消费某个topic的某个队列中的消息是顺序的【队列扩容时操作部分数据乱序】。
5.1.1 生产者
@GetMapping("/send/partOrder/{targetId}/{messageBody}")public String sendPartOrderMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {Message message = new Message();message.setTopic("partOrder");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody((targetId + "_1_" + messageBody).getBytes(StandardCharsets.UTF_8));// 方法1// 同步顺序消息// 默认根据id值采用SelectMessageQueueByHash策略来选择MessageQueuerocketMqTemplate.syncSendOrderly("partOrder", message, targetId.toString());// 方法2// 异步顺序消息// 默认根据id值采用SelectMessageQueueByHash策略来选择MessageQueuemessage.setBody((targetId + "_2_" + messageBody).getBytes(StandardCharsets.UTF_8));rocketMqTemplate.asyncSendOrderly("partOrder", message, targetId.toString(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("sendPartOrderMsg success targetId:{}, messageBody:{}", targetId, messageBody);}@Overridepublic void onException(Throwable throwable) {log.error("sendPartOrderMsg fail targetId:{}, messageBody:{}", targetId, messageBody);}});// 方法3// 发送消息,打印日志SendResult sendResult = null;try {message.setBody((targetId + "_3_" + messageBody).getBytes(StandardCharsets.UTF_8));sendResult = defaultMqProducer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object targetId) {Integer id = (Integer) targetId;int index = id % list.size();return list.get(index);}}, targetId);log.info("sendPartOrderMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {log.info("sendPartOrderMsg error", e);return "FAIL";}return "OK";}
5.1.2 消费者
@Component
@RocketMQMessageListener(topic = "partOrder", consumerGroup = "partOrder", consumeMode = ConsumeMode.ORDERLY)
@Slf4j
public class MyPartOrderConsumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {// 处理消息的逻辑log.info("Received partOrder message: {}", message);}}
5.2 全局有序
消费者消费全部消息都是顺序的,只能通过一个某个topic只有一个队列才能实现,这种应用场景较少,且性能较差。【向唯一队列中发送消息,队列无法扩展】。
5.2.1 生产者
@GetMapping("/send/overallOrder/{targetId}/{messageBody}")public String sendOverallOrderMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {Message message = new Message();message.setTopic("partOrder");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody((targetId + "_1_" + messageBody).getBytes(StandardCharsets.UTF_8));// 方法1// 同步顺序消息// 默认根据id值采用SelectMessageQueueByHash策略来选择MessageQueuerocketMqTemplate.syncSendOrderly("overallOrder", message, OVER_ALL_ORDER_KEY);// 方法2// 异步顺序消息// 默认根据id值采用SelectMessageQueueByHash策略来选择MessageQueuemessage.setBody((targetId + "_2_" + messageBody).getBytes(StandardCharsets.UTF_8));rocketMqTemplate.asyncSendOrderly("overallOrder", message, OVER_ALL_ORDER_KEY, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("sendOverallOrderMsg success targetId:{}, messageBody:{}", targetId, messageBody);}@Overridepublic void onException(Throwable throwable) {log.error("sendOverallOrderMsg fail targetId:{}, messageBody:{}", targetId, messageBody);}});// 方法3// 发送消息,打印日志SendResult sendResult = null;try {message.setBody((targetId + "_3_" + messageBody).getBytes(StandardCharsets.UTF_8));sendResult = defaultMqProducer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object targetId) {return list.get(0);}}, targetId);log.info("sendOverallOrderMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {log.info("sendOverallOrderMsg error", e);return "FAIL";}return "OK";}
6.事务消息
6.1 生产者
@RestController
@Slf4j
public class TransactionProducerController {@Resourceprivate RocketMQTemplate rocketMqTemplate;@GetMapping("/send/transaction/{targetId}/{messageBody}")public String sendTransactionMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {Message message = MessageBuilder.withPayload(messageBody).build();TransactionSendResult transaction = rocketMqTemplate.sendMessageInTransaction("transaction", message, targetId);return transaction.getTransactionId();}}@Slf4j
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener {/** 执行本地事务(在发送消息成功时执行) */@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {//模拟一个处理结果int index=2;/*** 模拟返回事务状态*/switch (index){case 1://处理业务String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);log.info("本地事务回滚,回滚消息,"+jsonStr);//返回ROLLBACK状态的消息会被丢弃return RocketMQLocalTransactionState.ROLLBACK;case 2://返回UNKNOW状态的消息会等待Broker进行事务状态回查log.info("需要等待Broker进行事务状态回查");return RocketMQLocalTransactionState.UNKNOWN;default:log.info("事务提交,消息正常处理");//返回COMMIT状态的消息会立即被消费者消费到return RocketMQLocalTransactionState.COMMIT;}}/*** 检查本地事务的状态* 回查间隔时间:系统默认每隔60秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。* 第一次消息回查最快*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String transactionId = message.getHeaders().get("__transactionId__").toString();log.info("检查本地事务状态,transactionId:{}", transactionId);return RocketMQLocalTransactionState.COMMIT;}
}
6.2 消费者
@Component
@RocketMQMessageListener(topic = "transaction", consumerGroup = "transaction")
@Slf4j
public class MyTransactionConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑log.info("Received transaction message: {}", message);}}
7.单项消息
单向发送只负责发送消息,不等待RocketMQ服务器返回的发送结果,也不提供回调函数来接收RocketMQ服务器的响应结果,只负责发送至于发送成功还是发送失败并不考虑。通常用于对可靠性要求不高的场景。
@GetMapping("/send/oneWay/{messageBody}")public String sendOneWayMsg(@PathVariable("messageBody") String messageBody) {// 构建消息对象Message message = new Message();message.setTopic("oneWay");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody(messageBody.getBytes());// 单向不可靠消息 void 方法无返回值rocketMqTemplate.sendOneWay("oneWay", message);// 同步可靠消息SendResult syncResult = rocketMqTemplate.syncSend("oneWay", message);// 异步可到消息rocketMqTemplate.asyncSend("oneWay", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("sendOneWayMsg success messageBody:{}", messageBody);}@Overridepublic void onException(Throwable throwable) {log.error("sendOneWayMsg fail messageBody:{}", messageBody);}});return "OK";}
相关文章:
Springboot 集成 RocketMQ(进阶-消息)
0. 入门篇 Springboot 集成 RocketMq(入门)-CSDN博客 1. 异步消息 1.1 生产者 GetMapping("/send/async/{messageBody}")public String sendAsyncMsg(PathVariable("messageBody") String messageBody) {// 构建消息对象Message m…...
10 索引优化与查询优化
文章目录 索引失效案例关联查询优化对于左外连接对于内连接JOIN语句原理简单嵌套循环连接SNLJ索引嵌套循环连接INLJ块嵌套循环连接BNLJHash Join 子查询优化排序优化filesort算法:双路排序和单路排序 分组优化分页优化优先考虑覆盖索引索引下推ICP使用条件 其他查询…...
linux PVE安装
先下载安装包: ISO - Proxmox Virtual Environment 普通电脑主机的话,做个U盘启动盘,进行刷机即可。 如果还没制作U盘启动盘,建议用这个,方便多个镜像切换 Download . Ventoy 按照刷机提示页面一步步配置即可&#…...
ZZ038 物联网应用与服务赛题第J套
2023年全国职业院校技能大赛 中职组 物联网应用与服务 任 务 书 (J卷) 赛位号:______________ 竞赛须知 一、注意事项 1.检查硬件设备、电脑设备是否正常。检查竞赛所需的各项设备、软件和竞赛材料等; 2.竞赛任务中所使用…...
【寒武纪(3)】媒体处理系统的系统控制、视频输入和后处理子系统
系统控制 文章目录 系统控制1、配置视频缓存池Video Pool2、配置硬件IP为在线工作(不通过DDR数据交互)/ 离线工作(写入DDR)模式3、硬IP可以使用 非Video Block (VB)内存4、配置是否启动内存传递的压缩 视频…...
Linux下使用vscode编写Python项目
我此处是使用VScode远程连接的服务器,具体方法可看如下: 1、vscode中安装Python插件 按上面步骤安装好Python插件后,重启vscode; 2、选择Python解释器 创建Python项目结构: 按下F1,打开vscode命令栏&am…...
使用 curator 连接 zookeeper 集群 Invalid config event received
dubbo整合zookeeper 如图,错误日志 2023-11-04 21:16:18.699 ERROR 7459 [main-EventThread] org.apache.curator.framework.imps.EnsembleTracker Caller0 at org.apache.curator.framework.imps.EnsembleTracker.processConfigData(EnsembleTracker.java…...
大促期间也要做好低价治理
低价链接无处不在,对于品牌来说,日常治理低价是常规操作,那面对价格变化更快、促销信息更丰富的大促,对低价的治理要求会更高。否则容易被未授权在大促双十一、六一八期间分食流量。 力维网络有专业的团队为品牌提供低价治理服务&…...
【c++】——类和对象(中)——默认成员函数(上)
【学习目标】 1. 类的6个默认成员函数 2. 构造函数 3. 析构函数 4. 拷贝构造函数 目录 一.类的6个默认成员函数 二. 构造函数 2.1 概念 2.2.特性 三.析构函数 3.1.概念 3.2 特性 四.拷贝构造函数 4.1.概念 4.2.特性 一.类的6个默认成员函数 如果一个类中什么成员…...
钉钉企业微应用开发C#-HTTP回调接口
官方的STREAM回调推送的方式,试了几次都认证不过,就放弃了还是用HTTP的模式吧。 /// <summary>/// 应用回调/// </summary>/// <param name"model"></param>/// <returns></returns>public static Dictio…...
Rust编程基础之条件表达式和循环
1.if表达式 if 表达式允许根据条件执行不同的代码分支, 以下代码是一个典型的使用if表达式的例子: fn main() {let number 3; if number < 5 {println!("condition was true");} else {println!("condition was false");} } 所有的 if 表达式都以…...
MATLAB算法实战应用案例精讲-【人工智能】ROS机器人(补充篇)
目录 前言 ROS 机器人导航调参 1 速度和加速度 2 全局路径规划 3 局部路径规划...
基于8086汽车智能小车控制系统
**单片机设计介绍,基于8086汽车智能小车控制系统 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于 8086 的汽车智能小车控制系统是一种将微处理器技术应用于汽车控制的系统。下面是其主要的设计介绍: 硬…...
全光谱大面积氙光灯太阳光模拟器老化测试
氙灯光源太阳光模拟器广泛应用于光解水产氢、光化学催化、二氧化碳制甲醇、光化学合成、光降解污染物、 水污染处理、生物光照,光学检测、太阳能电池研究、荧光材料测试(透射、反射、吸收) 太阳能电池特性测试,光热转化,光电材料特性测试,生物…...
linux添加一条到中间路由器的路由
有时候需要配置一些明细路由,不能直接通过网关进行路由转发 配置示例 ip route add 10.0.12.0/24 via 10.0.41.1 dev bond0 这个命令是用于在Linux操作系统上配置IP路由的命令。具体来说,这个命令的含义是: ip route add: 这部分表示要添加…...
不同MySQL服务的表以及库的数据迁移(/备份)
目标: 将本地主机上usernameroot,passwordroot,port3307的MySQL服务中migration_one数据库的table_11数据表导出到本地的D:\start_java\XinQiUtilsOrDemo\testMigrationMySQL\table_11.bak注意:目前D:\start_java\XinQiUtilsOrDemo\testMigrationMySQL该…...
聊聊芯片超净间的颗粒(particle)
在芯片制造领域,颗粒的存在可能对生产过程产生巨大影响。其中,每个微小的颗粒,无论是来自人员、设备,还是自然环境,都有可能在制程中引发故障,从而对产品性能产生负面影响。这就是为什么在芯片厂中…...
服务器(windows Server 2019为例)中的日志中文乱码的解决办法
1. 首先,打开控制面板,找到区域(Region),把Format设置为国语简体中文,点击高级(Administrative)后设置Current system locale为国语简体中文,按照图中步骤:...
Linux 学习(CentOS 7)
CentOS 7 学习 Linux系统内核作者: Linux内核版本 内核(kernel)是系统的心脏,是运行程序和管理像磁盘和打印机等硬件设备的核心程序,它提供了一个在裸设备与应用程序间的抽象层。 Linux内核版本又分为稳定版和开发版,两种版本是相互关联&am…...
架构决策记录 ADR
在项目和产品开发过程中,软件工程团队需要做出架构决策以实现其目标。这些决策可以是技术性的,也可以与流程相关。 技术决策:例如决定使用JBOSS Data Grid作为缓存解决方案还是选择Amazon Elasticache,或者决定使用AWS Network L…...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
Reasoning over Uncertain Text by Generative Large Language Models
https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...
力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
【 java 虚拟机知识 第一篇 】
目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...
基于Java+VUE+MariaDB实现(Web)仿小米商城
仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意:运行前…...
