549、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ(三)】 2023.02.28
目录
- 一、Spring 整合 RocketMQ
- 1.1 消息生产者
- 1.2 消息消费者
- 1.3 Spring 配置文件
- 1.4 运行实例程序
- 二、参考链接
一、Spring 整合 RocketMQ
不同于 RabbitMQ、ActiveMQ、Kafka 等消息中间件,Spring 社区已经通过多种方式提供了对这些中间件产品集成,例如通过 spring-jms 整合 ActiveMQ、通过 Spring AMQP 项目下的 spring-rabbit 整合 RabbitMQ、通过 spring-kafka 整合 kafka ,通过他们可以在 Spring 项目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三种方式,一是将消息生产者和消费者定义成 bean 对象交由 Spring 容器管理,二是使用 RocketMQ 社区的外部项目 rocketmq-jms(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms)然后通过 spring-jms 方式集成使用,三是如果你的应用是基于 spring-boot 的,可以使用 RocketMQ 的外部项目 rocketmq-spring-boot-starter(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter)比较方便的收发消息。
总的来讲 rocketmq-jms 项目实现了 JMS 1.1 规范的部分内容,目前支持 JMS 中的发布/订阅模型收发消息。rocketmq-spring-boot-starter 项目目前已经支持同步发送、异步发送、单向发送、顺序消费、并行消费、集群消费、广播消费等特性,如果比较喜欢 Spring Boot 这种全家桶的快速开发框架并且现有特性已满足业务要求可以使用该项目。当然从 API 使用上最灵活的还是第一种方式,下面以第一种方式为例简单看下Spring 如何集成 RocketMQ 的。
1.1 消息生产者
package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;public class SpringProducer {private Logger logger = Logger.getLogger(getClass());private String producerGroupName;private String nameServerAddr;private DefaultMQProducer producer;public SpringProducer(String producerGroupName, String nameServerAddr) {this.producerGroupName = producerGroupName;this.nameServerAddr = nameServerAddr;}public void init() throws Exception {logger.info("开始启动消息生产者服务...");//创建一个消息生产者,并设置一个消息生产者组producer = new DefaultMQProducer(producerGroupName);//指定 NameServer 地址producer.setNamesrvAddr(nameServerAddr);//初始化 SpringProducer,整个应用生命周期内只需要初始化一次producer.start();logger.info("消息生产者服务启动成功.");}public void destroy() {logger.info("开始关闭消息生产者服务...");producer.shutdown();logger.info("消息生产者服务已关闭.");}public DefaultMQProducer getProducer() {return producer;}
}
消息生产者就是把生产者 DefaultMQProducer 对象的生命周期分成构造函数、init、destroy 三个方法,构造函数中将生产者组名、NameServer 地址作为变量由 Spring 容器在配置时提供,init 方法中实例化 DefaultMQProducer 对象、设置 NameServer 地址、初始化生产者对象,destroy 方法用于生产者对象销毁时清理资源。
1.2 消息消费者
package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;public class SpringConsumer {private Logger logger = Logger.getLogger(getClass());private String consumerGroupName;private String nameServerAddr;private String topicName;private DefaultMQPushConsumer consumer;private MessageListenerConcurrently messageListener;public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {this.consumerGroupName = consumerGroupName;this.nameServerAddr = nameServerAddr;this.topicName = topicName;this.messageListener = messageListener;}public void init() throws Exception {logger.info("开始启动消息消费者服务...");//创建一个消息消费者,并设置一个消息消费者组consumer = new DefaultMQPushConsumer(consumerGroupName);//指定 NameServer 地址consumer.setNamesrvAddr(nameServerAddr);//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅指定 Topic 下的所有消息consumer.subscribe(topicName, "*");//注册消息监听器consumer.registerMessageListener(messageListener);// 消费者对象在使用之前必须要调用 start 初始化consumer.start();logger.info("消息消费者服务启动成功.");}public void destroy(){logger.info("开始关闭消息消费者服务...");consumer.shutdown();logger.info("消息消费者服务已关闭.");}public DefaultMQPushConsumer getConsumer() {return consumer;}}
同消息生产者类似,消息消费者是把生产者 DefaultMQPushConsumer 对象的生命周期分成构造函数、init、destroy 三个方法,具体含义在介绍 Java 访问 RocketMQ 实例时已经介绍过了,不再赘述。当然,有了消费者对象还需要消息监听器在接收到消息后执行具体的处理逻辑。
package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.List;public class MessageListener implements MessageListenerConcurrently {private Logger logger = Logger.getLogger(getClass());public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (list != null) {for (MessageExt ext : list) {try {logger.info("监听到消息 : " + new String(ext.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}
消息监听器类就是把前面 Java 示例中注册消息监听器时声明的匿名内部类代码抽取出来定义成单独一个类而已。
1.3 Spring 配置文件
因为只使用 Spring 框架集成,所以除了 Sping 框架核心 jar 包外不需要额外添加依赖包了。本例中将消息生产者和消息消费者分成两个配置文件,这样能更好的演示收发消息的效果。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsd"><bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy"><constructor-arg name="nameServerAddr" value="localhost:9876"/><constructor-arg name="producerGroupName" value="spring_producer_group"/></bean>
</beans>
消息生产者配置很简单,定义了一个消息生产者对象,该对象初始化时调用 init 方法,对象销毁前执行 destroy 方法,将 Name Server 地址和生产者组配置好。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsd"><bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" /><bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy"><constructor-arg name="nameServerAddr" value="localhost:9876"/><constructor-arg name="consumerGroupName" value="spring_consumer_group"/><constructor-arg name="topicName" value="spring-rocketMQ-topic" /><constructor-arg name="messageListener" ref="messageListener" /></bean></beans>
消息消费者同消息生产者配置类似,多了一个消息监听器对象的定义和绑定。
1.4 运行实例程序
按前述步骤 启动 Name Server 和 Broker,接着运行消息生产者和消息消费者程序,简化起见我们用两个单元测试类模拟这两个程序:
package org.study.mq.rocketMQ.spring;import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringProducerTest {private ApplicationContext container;@Beforepublic void setup() {container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");}@Testpublic void sendMessage() throws Exception {SpringProducer producer = container.getBean(SpringProducer.class);for (int i = 0; i < 20; i++) {//创建一条消息对象,指定其主题、标签和消息内容Message msg = new Message("spring-rocketMQ-topic",null,("Spring RocketMQ demo " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */);//发送消息并返回结果SendResult sendResult = producer.getProducer().send(msg);System.out.printf("%s%n", sendResult);}}
}
SpringProducerTest 类模拟消息生产者发送消息。
package org.study.mq.rocketMQ.spring;import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringConsumerTest {private ApplicationContext container;@Beforepublic void setup() {container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");}@Testpublic void consume() throws Exception {SpringConsumer consumer = container.getBean(SpringConsumer.class);Thread.sleep(200 * 1000);consumer.destroy();}
}
SpringConsumerTest 类模拟消息消费者者接收消息,在 consume 方法返回之前需要让当前线程睡眠一段时间,使消费者程序继续存活才能监听到生产者发送的消息。
分别运行 SpringProducerTest 类 和 SpringConsumerTest 类,在 SpringConsumerTest 的控制台能看到接收的消息:

假如启动两个 SpringConsumerTest 类进程,因为它们属于同一消费者组,在 SpringConsumerTest 的控制台能看到它们均摊到了消息:


二、参考链接
[01] 消息队列之 RocketMQ
相关文章:
549、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ(三)】 2023.02.28
目录一、Spring 整合 RocketMQ1.1 消息生产者1.2 消息消费者1.3 Spring 配置文件1.4 运行实例程序二、参考链接一、Spring 整合 RocketMQ 不同于 RabbitMQ、ActiveMQ、Kafka 等消息中间件,Spring 社区已经通过多种方式提供了对这些中间件产品集成,例如通…...
如何使用SpringBoot ⽇志?
Spring Boot自定义日志的打印:在一个类中先获取到打印日志对象(日志框架提供的日志对象,而日志框架默认已经集成到Spring Boot里了,springboot默认使用 slf4jlogback);注意:得到日志对象Logger ->来自于slf4j2、使用目志对象提…...
山东大学数字图像处理实验:MATLAB的图像显示方法
文章目录MATLAB 学习实验目的实验原理及方法实验内容MATLAB的图像显示方法实验目的实验内容MATLAB 学习 实验目的 了解 MATLAB 的基本功能及操作方法。掌握典型离散信号的 Matlab 产生和显示。 实验原理及方法 在 MATLAB 中, 序列是用矩阵向量表示, 但它没有包含采样信息, …...
Java缓存面试题——Redis解决方案
文章目录1、什么是缓存击穿?该如何解决2、什么是缓存穿透?该如何解决3、什么是缓存雪崩?该如何解决4、什么是BigKey?该如何解决bigkey的危害发现bigkey解决bigkey5、redis过期策略都有哪些?6、讲一讲Redis缓存的数据一…...
Flink:The generic type parameters of ‘Collector‘ are missing 类型擦除
类型擦除问题处理报错日志描述问题描述报错解决其他方法方法一:TypeInformation方法二:TypeHint报错日志描述 报错日志: The generic type parameters of Collector are missing. In many cases lambda methods dont provide enough informa…...
MySQL查询操作
系列文章目录前言一、简单查询SELECT子句SELECT后面之间跟列名DISTINCT,ALL列表达式列更名WHERE子句WHERE子句中可以使用的查询条件比较运算BETWEEN...AND...集合查询:IN模糊查询LIKE空值比较:IS NULL多重条件查询SELECT 的基本结构ORDER BY子句排序聚集…...
Redis-day01-note
Redis-day01-note 文章目录**Redis-day01-note****安装****配置文件详解****数据类型****字符串类型(string)**列表数据类型(List)****与python交互**Redis介绍特点及优点 1、开源的,使用C编写,基于内存且支持持久化 2、高性能的…...
嵌入式C基础知识(19)
时序在前面我们说到当处理器要向外设芯片写数据时,需要先将所需访问的外设的地址放在地址总线上,然后,由译码器将地址总线上的数据转换成片选信号,片选信号则使能目标外设芯片,接下来处理器写数据到数据总线上…...
java 2(程序流程控制)【含例题详解】
java ——程序流程控制 ✍作者:电子科大不知名程序员 🌲专栏:java学习指导 各位读者如果觉得博主写的不错,请诸位多多支持;如果有错误的地方,欢迎在评论区指出 目录java ——程序流程控制分支结构if-elsesw…...
基于Conda完成创建多版本python环境
文章目录基于Conda完成创建多版本python环境基于Conda完成创建多版本python环境 通过cmd打开conda环境 d:\ProgramData\Anaconda3\Scripts\activate创建python3.7的环境 conda create -n py3.7 python3.7产生错误 Collecting package metadata (repodata.json): failed Unav…...
35岁的测试被裁,公司地位还不如00后...
国内的互联网行业发展较快,所以造成了技术研发类员工工作强度比较大,同时技术的快速更新又需要员工不断的学习新的技术。因此淘汰率也比较高,超过35岁的基层研发类员工,往往因为家庭原因、身体原因,比较难以跟得上工作…...
vue H5跳转小程序报错:config:fail,Error: 系统错误,错误码:63002,invalid signature
微信开发者工具下载地址与更新日志 错误码:63002,invalid signature 无效的签名 附录5 微信网页开发 /JS-SDK说明文档 微信 JS 接口签名校验工具 全局返回码说明 排查步骤 确认签名算法正确,可用 http://mp.weixin.qq.com/debug/cgi-bin/sand…...
来面试阿里测开工程师,HR问我未来3-5年规划,我给HR画个大饼。
在面试的过程中是不是经常被面试官问未来几年的职业规划?你会答吗?是不是经常脑袋里一片空白,未来规划?我只是想赚更多的钱啊,哈哈哈,今天我来教大家,如何给面试官画一个大饼,让他吃的不亦乐乎…...
【2373. 矩阵中的局部最大值】
来源:力扣(LeetCode) 描述: 给你一个大小为 n x n 的整数矩阵 grid 。 生成一个大小为 (n - 2) x (n - 2) 的整数矩阵 maxLocal ,并满足: maxLocal[i][j] 等于 grid 中以 i 1 行和 j 1 列为中心的 3 …...
Read book Netty in action(Chapter VII)--ChannelHandler和ChannelPipeline
序言 我们曾经学过了ByteBuf – netty的数据容器,还有ChannelHandler和ChannelPipeline,这一把将他们组合起来,这些组件的交互正是Netty的灵魂所在! ChannelHanlder家族 在详细地学习ChannelHanlder之前,我们将在Ne…...
react的严格模式 和 解决react useEffect执行两次
useEffect执行两次 这个问题,主要是刚接触react的时候发的问题,当时也没总结。现在回过头来再总结一次!!! 文章目录useEffect执行两次前言一、为什么useEffect执行两次1.React的严格模式(模版创建项目&…...
C++中的STL
一、概念 STL,英文全称 standard template library,中文可译为标准模板库或者泛型库,其包含有大量的模板类和模板函数,是 C 提供的一个基础模板的集合,用于完成诸如输入/输出、数学计算等功能。 STL 最初由惠普实验室…...
【沐风老师】3dmax一键窗户生成器插件使用方法详解
3dmax一键窗户生成器插件教程 3dMax一键窗户生成器是一个在3dMax中自动创建3D窗户模型的脚本。它有28种风格的窗户样式,可以在Archviz项目中灵活应用,同时为3D艺术家节省大量时间。 【适用版本】 适用3dMax 2018.2及更高版本 【安装方法】 1.解压缩包&…...
【图像处理】数字图像处理基础(分辨率,像素,显示...)
Table of Contents1.数字图像处理基础1.1 图像表示1.1.1 图像成像模型1.1.2 数字图像的表示a.图像采样b.图像灰度的量化c.算比特数1.2 分辨率1.2.1 空间分辨率1.2.2 灰度分辨率1.3 像素间的关系1.3.1 像素邻域a.4邻域b.4对角邻域c.8邻域1.3.2 像素邻接1.3.3 像素连通1.3.4 像素…...
UE实现相机飞行效果CesiumForUnreal之DynamicPawn飞行原理浅析
文章目录 1.实现目标2.实现过程2.1 FlyTo实现原理与代码2.2 DynamicPawn飞行原理3.参考资料1.实现目标 基于CesiumForUnreal的Dynamic Pawn实现飞行效果GIF动图: 2.实现过程 实现原理较为简单,基于CesiumForUnreal插件中DynamicPawn中的Camera实现相关功能。其中FlyTo直接通…...
龙虎榜——20250610
上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...
Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...
srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...
从零实现STL哈希容器:unordered_map/unordered_set封装详解
本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说,直接开始吧! 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...
涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战
“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...
html-<abbr> 缩写或首字母缩略词
定义与作用 <abbr> 标签用于表示缩写或首字母缩略词,它可以帮助用户更好地理解缩写的含义,尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时,会显示一个提示框。 示例&#x…...
佰力博科技与您探讨热释电测量的几种方法
热释电的测量主要涉及热释电系数的测定,这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中,积分电荷法最为常用,其原理是通过测量在电容器上积累的热释电电荷,从而确定热释电系数…...
【分享】推荐一些办公小工具
1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由:大部分的转换软件需要收费,要么功能不齐全,而开会员又用不了几次浪费钱,借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...
处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的
修改bug思路: 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑:async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...
