RocketMq详解:三、RocketMq通用生产和消费方法改造
文章目录
- 1.背景
- 2.通用方法改造
- 2.1添加maven依赖
- 2.2 RocketMq基础配置
- 2.3 配置类
- 2.5 消息传输的对象和结果
- 2.4 消息生产者
- 2.5 消息消费者
- 2.6 功能测试
1.背景
在第二章:《RocketMq详解:二、SpringBoot集成RocketMq》中我们已经实现了消费基本生产和消费的实现,但是在真实的开发环境中如果按照这种方式去实现,冗余代码较多,且通过实现RocketMQListener中onMessage的方法去完成消息消费无返回结果,在后期的流程中不易维护,因此,本章将对这些问题进行二次改造和优化。
为了防止新同学从头开始学,本章将如何配置和实现简单在复述一下,至于具体怎么安装RocketMq,本文提供两种安装方法:
- 《MacOS环境下RocketMQ安装及部署 RocketMQ Dashboard 可视化》
- 《docker安装rocketMq》
2.通用方法改造
2.1添加maven依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency>
2.2 RocketMq基础配置
接着,在application.yml或application.properties文件中配置RocketMQ的相关参数,如NameServer地址、生产者组、消费者组等:
rocketmq:name-server: 127.0.0.1:9876# 生产者producer:group: boot_group_1# 消息发送超时时间send-message-timeout: 3000# 消息最大长度4Mmax-message-size: 4096# 消息发送失败重试次数retry-times-when-send-failed: 3# 异步消息发送失败重试次数retry-times-when-send-async-failed: 2# 消费者consumer:group: boot_group_1# 每次提取的最大消息数pull-batch-size: 5
上面的配置如果是在分布式环境下也可以配置在Apollo或nacos等配置中心里进行动态配置
2.3 配置类
在配置类中主要定义两个Bean的加载,即RocketMQTemplate和DefaultMQProducer,主要是提供消息发送的能力,即生产消息;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author ninesun* @ClassName RocketMqConfig* @description: 消息中间件配置类* @date 2024年05月19日* @version: 1.0*/
@Configuration
public class RocketMqConfig {@Value("${rocketmq.name-server}")private String nameServer;@Value("${rocketmq.producer.group}")private String producerGroup;@Value("${rocketmq.producer.send-message-timeout}")private Integer sendMsgTimeout;@Value("${rocketmq.producer.max-message-size}")private Integer maxMessageSize;@Value("${rocketmq.producer.retry-times-when-send-failed}")private Integer retryTimesWhenSendFailed;@Value("${rocketmq.producer.retry-times-when-send-async-failed}")private Integer retryTimesWhenSendAsyncFailed;@Beanpublic RocketMQTemplate rocketMqTemplate() {RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();rocketMqTemplate.setProducer(defaultMqProducer());return rocketMqTemplate;}@Beanpublic DefaultMQProducer defaultMqProducer() {DefaultMQProducer producer = new DefaultMQProducer();producer.setNamesrvAddr(this.nameServer);producer.setProducerGroup(this.producerGroup);producer.setSendMsgTimeout(this.sendMsgTimeout);producer.setMaxMessageSize(this.maxMessageSize);producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);return producer;}
}
在编写消费者和生产者之前,我们先统一一下消息传输的对象,以及消费的结果
2.5 消息传输的对象和结果
- 基本传输对象
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageDTO<T> {private T data;private Integer delayTime;@NotBlank(message = "Topic must not be blank")private String topic;@NotBlank(message = "Key must not be blank")private String key;private List<String> tags;private String messageType;
}
- 消费结果枚举
public enum MsgRetryStatus {SUCCEED(-1), FAILURE(0), RETRY(0), RETRY_1S(1), RETRY_5S(2),RETRY_10S(3), RETRY_30S(4), RETRY_1M(5), RETRY_2M(6), RETRY_3M(7),RETRY_4M(8), RETRY_5M(9), RETRY_6M(10), RETRY_7M(11), RETRY_8M(12),RETRY_9M(13), RETRY_10M(14), RETRY_20M(15), RETRY_30M(16), RETRY_1H(17), RETRY_2H(18),RETRY_1D(19), RETRY_3D(20), RETRY_7D(21), RETRY_14D(22), RETRY_21D(23), RETRY_28D(24),RETRY_35D(25);int level;MsgRetryStatus(int level) {this.level = level;}public int getLevel() {return level;}
}
2.4 消息生产者
@Component
public class MessageProduct {@Resourceprivate RocketMQTemplate rocketMqTemplate;public SendResult SendMessage(MessageDTO data) {// 创建一个Message对象org.springframework.messaging.Message<?> message = MessageBuilder.withPayload(JSON.toJSONString(data)).setHeader(RocketMQHeaders.KEYS,data.getKey()).setHeader(RocketMQHeaders.TAGS,data.getTags()).build();return rocketMqTemplate.syncSendDelayTimeSeconds(data.getTopic(), message, data.getDelayTime());}
}
2.5 消息消费者
在原始的实现中,我们通过实现RocketMQListener接口中的onMessage方法来完成消息的消费。
然而,这种方式存在一个问题:如果消息消费失败,我们无法获取到返回结果,也不便于进行错误处理和重试。
为了优化这个问题,我们可以使用@RocketMQMessageListener注解的returnTopic属性来指定一个返回主题,当消息消费失败时,将消息发送到这个返回主题中。
同时,我们可以创建一个专门的处理失败消息的消费者来处理这些返回的消息。
另外,我们还可以在onMessage方法中添加异常处理逻辑,以便在消费失败时进行错误处理和记录日志。
新增一个抽象类CommonConsumer去实现RocketMQListener,并提供一个有返回值的doConsumerProcess方法,去实现具体的消费逻辑,具体实现如下:
@Slf4j
@Component
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {public void onMessage(MessageDTO message) {log.info("收到延迟消息成功,消息体:{}", message);doConsumerProcess(message);}public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);
}
以boot-mq-topic为例,我们实现具体的消费,新增一个对象BootMqConsumer继承我们的CommonConsumer,来实现具体的消费逻辑
@Slf4j
@Component
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {public void onMessage(MessageDTO message) {try {// 处理消息的逻辑log.info("收到延迟消息成功,消息体:{}", message);MsgRetryStatus msgRetryStatus = doConsumerProcess(message);if (MsgRetryStatus.RETRY.equals(msgRetryStatus)|| MsgRetryStatus.FAILURE.equals(msgRetryStatus)) {//TODO 消费失败或重试 则发送重试Topic}} catch (Exception e) {// 记录错误日志e.printStackTrace();// 可以选择将失败消息发送到指定Topicthis.sendReturnMessgae();}}public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);private void doRetrytConsumerProcess() {//TODO:待实现,后面补上}
}
2.6 功能测试
package com.example.demo.controller;import com.alibaba.fastjson.JSON;
import com.example.demo.annoation.Idempotent;
import com.example.demo.mq.producer.MessageProduct;
import com.example.demo.po.MessageDTO;
import com.example.demo.po.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.Collections;
import java.util.UUID;@RestController
@Slf4j
@Validated
public class TestController01 {@Resourceprivate RocketMQTemplate rocketMqTemplate;@Resourceprivate DefaultMQProducer defaultMqProducer;@Resourceprivate MessageProduct messageProduct;@GetMapping("/send/msg4")public String sendMsg4() {try {User user = User.builder().id(1).name("ninesun").build();MessageDTO<User> messageDTO = MessageDTO.<User>builder().data(user).delayTime(3).topic("boot-mq-topic").key(String.valueOf(UUID.randomUUID())).build();SendResult sendResult = messageProduct.SendMessage(messageDTO);log.info("msgId:{},sendStatus:{},data:{}", sendResult.getMsgId(), sendResult.getSendStatus(), JSON.toJSONString(messageDTO));} catch (Exception e) {e.printStackTrace();}return "OK";}
}
附:User对象
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {private Integer id;private String name;private String desc;
}
访问:/send/msg4接口,结果如下:
至此,我们便可以在真实环境中很容易的去集成消息队列实现功能的解耦,流量的削峰。
相关文章:

RocketMq详解:三、RocketMq通用生产和消费方法改造
文章目录 1.背景2.通用方法改造2.1添加maven依赖2.2 RocketMq基础配置2.3 配置类2.5 消息传输的对象和结果2.4 消息生产者2.5 消息消费者2.6 功能测试 1.背景 在第二章:《RocketMq详解:二、SpringBoot集成RocketMq》中我们已经实现了消费基本生产和消费…...

基于SpringBoot+Vue+Uniapp的仓库点单小程序的详细设计和实现
2. 详细视频演示 文章底部名片,联系我获取更详细的演示视频 3. 论文参考 4. 项目运行截图 代码运行效果图 代码运行效果图 代码运行效果图 代码运行效果图代码运行效果图 代码运行效果图 5. 技术框架 5.1 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发…...

R语言从多波段tif数据中逐个提取单波段数据
在遥感和地理信息系统(GIS)领域,将多个波段存储在一个文件中可以更有效地进行数据压缩和管理,减少了存储空间的需求。 在R语言中,处理多波段栅格数据通常涉及以下步骤: 读取数据:使用raster包中…...

华为海思:大小海思的双轮驱动战略分析
华为海思,作为华为旗下的半导体设计部门,近年来在芯片设计领域取得了显著成就,成为了中国乃至全球芯片设计的重要力量。实际上,华为海思并非单一实体,而是由两个主要分支构成:大海思和小海思。这两个分支虽然同属华为海思,但在定位、产品布局以及市场策略上有所不同,共…...

LeetCode | 704.二分查找
标准的二分查找,直接上模板! class Solution(object):def search(self, nums, target):""":type nums: List[int]:type target: int:rtype: int"""l 0r len(nums) - 1while l < r:mid (l r 1) / 2if nums[mid] …...

TCP三握四挥
TCP三握(简述) 一开始,客户端和服务端都处于closed状态,服务端主动监听某个端口,处于listen状态 一握要进行C-S的第一个SYN发送,客户端会随机初始化序列号(client_isn)并将其置于TCP首部的序列号字段中,并且将SYN标志…...

java项目之大型商场应急预案管理系统(源码+文档)
项目简介 大型商场应急预案管理系统实现了以下功能: 大型商场应急预案管理系统的主要使用者管理员功能有个人中心,员工管理,预案信息管理,预案类型管理,事件类型管理,预案类型统计管理,事件类…...

【C++】--内存管理
👾个人主页: 起名字真南 👻个人专栏:【数据结构初阶】 【C语言】 【C】 目录 1 C/C内存分布2 C语言中动态内存管理方式 :3 C内存管理方式3.1 new/delete操作内置类型3.2 new和delete操作自定义类型 4 operator new与operator delete4.1 opera…...

【设计模式系列】模板方法模式
一、什么是模板方法模式 模板方法模式(Template Method Pattern)是一种行为型设计模式,它在父类中定义一个算法的框架,允许子类在不改变算法结构的情况下重写算法的某些特定步骤。这种模式非常适合于那些存在共同行为的类&#x…...

java8 Stream流详细API及用法
目录 整理的更全面的API及用法 创建Stream流 中间操作 filter 过滤 map 映射 flatMap 扁平映射 sorted 排序 limit 截断 skip 跳过 distinct 去重 peek 遍历 终端操作 forEach 遍历 forEachOrdered 顺序遍历 min 统计最小值 max 统计最大值 count 统计元素数量 f…...

Redis——持久化
文章目录 Redis持久化Redis的两种持久化的策略定期备份:RDB触发机制rdb的触发时机:手动执行save&bgsave保存测试不手动执行bgsave测试bgsave操作流程测试通过配置,自动生成rdb快照RDB的优缺点 实时备份:AOFAOF是否会影响到red…...

川字结构布局/国字结构布局
1.串字结构布局 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><style&g…...

2013年国赛高教杯数学建模C题古塔的变形解题全过程文档及程序
2013年国赛高教杯数学建模 C题 古塔的变形 由于长时间承受自重、气温、风力等各种作用,偶然还要受地震、飓风的影响,古塔会产生各种变形,诸如倾斜、弯曲、扭曲等。为保护古塔,文物部门需适时对古塔进行观测,了解各种变…...

web 0基础第一节 文本标签
这是一个html文件的基本结构 在vs code 中使用英文的 ! 可快捷设置这样的结构 <!-- --> 是在html写注释的结构 <!DOCTYPE html> <!--标识当前文档类型为html--> <html> …...

Zookeeper快速入门:部署服务、基本概念与操作
文章目录 一、部署服务1.下载与安装2.查看并修改配置文件3.启动 二、基本概念与操作1.节点类型特性总结使用场景示例查看节点查看节点数据 2.文件系统层次结构3.watcher 一、部署服务 1.下载与安装 下载: 一定要下载编译后的文件,后缀为bin.tar.gz w…...

【Sqlite】sqlite内部函数sqlite3_value_text特性
目录 ⚛️1 结论 ☪️2 说明 ☪️3 传入数值转成科学计数法 ♋3.1 只有整数部分 ♏3.2 只有小数部分 ♐3.3 整数小数 ⚛️1 结论 整数(sqlite视为int64)位数 > 20位,sqlite3_value_text 采用科学计数法。否则正常表示。 浮点数(sqlite视为double)的整数部…...
树莓派应用--AI项目实战篇来啦-4.OpenCV读取、写入和显示视频
1. 介绍 视频是由一张一张图片组成的,所以读取视频就相当于读取很多张图片,然后将其连起来cv2.VideoCapture可以捕获摄像头,但是针对树莓派的CSI摄像头调用方式采用了之前介绍的Picamera2 库,所以在调用的时候是有区别的ÿ…...

智能电子后视镜,汽车驾驶更安全,会是一种趋势
相比于传统的后视镜,智能电子后视镜的确有很多的优点。在下雨天和夜晚场景,电子后视镜可以说是表现优秀。 我之前一直以为我们国内是有规定不能使用电子后视镜。没想到,偶然刷到享界S9的视频,这电子后视镜,妥妥的给安排…...

IEC104规约的秘密之九----链路层和应用层
104规约从TCP往上,分成链路层和应用层。 如图,APCI就是链路层,ASDU的就是应用层 我们看到报文都是68打头的,因为应用层报文也要交给链路层发送,链路层增加了开头的6个字节再进行发送。 完全用于链路层的报文每帧都只有…...

最新Prompt预设词指令教程大全ChatGPT、AI智能体(300+预设词应用)
使用指南 直接复制在AI工具助手中使用(提问前) 可以前往已经添加好Prompt预设的AI系统测试使用(可自定义添加使用) SparkAi系统现已支持自定义添加官方GPTs(对专业领域更加专业,支持多模态文档࿰…...

Linux应用开发之网络套接字编程(实例篇)
服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了: 这一篇我们开始讲: 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下: 一、场景操作步骤 操作步…...

【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...

让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比
在机器学习的回归分析中,损失函数的选择对模型性能具有决定性影响。均方误差(MSE)作为经典的损失函数,在处理干净数据时表现优异,但在面对包含异常值的噪声数据时,其对大误差的二次惩罚机制往往导致模型参数…...