当前位置: 首页 > news >正文

549、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ(三)】 2023.02.28

目录

    • 一、Spring 整合 RocketMQ
      • 1.1 消息生产者
      • 1.2 消息消费者
      • 1.3 Spring 配置文件
      • 1.4 运行实例程序
    • 二、参考链接

一、Spring 整合 RocketMQ

不同于 RabbitMQ、ActiveMQ、Kafka 等消息中间件,Spring 社区已经通过多种方式提供了对这些中间件产品集成,例如通过 spring-jms 整合 ActiveMQ、通过 Spring AMQP 项目下的 spring-rabbit 整合 RabbitMQ、通过 spring-kafka 整合 kafka ,通过他们可以在 Spring 项目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三种方式,一是将消息生产者和消费者定义成 bean 对象交由 Spring 容器管理,二是使用 RocketMQ 社区的外部项目 rocketmq-jms(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms)然后通过 spring-jms 方式集成使用,三是如果你的应用是基于 spring-boot 的,可以使用 RocketMQ 的外部项目 rocketmq-spring-boot-starter(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter)比较方便的收发消息。
总的来讲 rocketmq-jms 项目实现了 JMS 1.1 规范的部分内容,目前支持 JMS 中的发布/订阅模型收发消息。rocketmq-spring-boot-starter 项目目前已经支持同步发送、异步发送、单向发送、顺序消费、并行消费、集群消费、广播消费等特性,如果比较喜欢 Spring Boot 这种全家桶的快速开发框架并且现有特性已满足业务要求可以使用该项目。当然从 API 使用上最灵活的还是第一种方式,下面以第一种方式为例简单看下Spring 如何集成 RocketMQ 的。

1.1 消息生产者

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;public class SpringProducer {private Logger logger = Logger.getLogger(getClass());private String producerGroupName;private String nameServerAddr;private DefaultMQProducer producer;public SpringProducer(String producerGroupName, String nameServerAddr) {this.producerGroupName = producerGroupName;this.nameServerAddr = nameServerAddr;}public void init() throws Exception {logger.info("开始启动消息生产者服务...");//创建一个消息生产者,并设置一个消息生产者组producer = new DefaultMQProducer(producerGroupName);//指定 NameServer 地址producer.setNamesrvAddr(nameServerAddr);//初始化 SpringProducer,整个应用生命周期内只需要初始化一次producer.start();logger.info("消息生产者服务启动成功.");}public void destroy() {logger.info("开始关闭消息生产者服务...");producer.shutdown();logger.info("消息生产者服务已关闭.");}public DefaultMQProducer getProducer() {return producer;}
}

消息生产者就是把生产者 DefaultMQProducer 对象的生命周期分成构造函数、init、destroy 三个方法,构造函数中将生产者组名、NameServer 地址作为变量由 Spring 容器在配置时提供,init 方法中实例化 DefaultMQProducer 对象、设置 NameServer 地址、初始化生产者对象,destroy 方法用于生产者对象销毁时清理资源。

1.2 消息消费者

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;public class SpringConsumer {private Logger logger = Logger.getLogger(getClass());private String consumerGroupName;private String nameServerAddr;private String topicName;private DefaultMQPushConsumer consumer;private MessageListenerConcurrently messageListener;public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {this.consumerGroupName = consumerGroupName;this.nameServerAddr = nameServerAddr;this.topicName = topicName;this.messageListener = messageListener;}public void init() throws Exception {logger.info("开始启动消息消费者服务...");//创建一个消息消费者,并设置一个消息消费者组consumer = new DefaultMQPushConsumer(consumerGroupName);//指定 NameServer 地址consumer.setNamesrvAddr(nameServerAddr);//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅指定 Topic 下的所有消息consumer.subscribe(topicName, "*");//注册消息监听器consumer.registerMessageListener(messageListener);// 消费者对象在使用之前必须要调用 start 初始化consumer.start();logger.info("消息消费者服务启动成功.");}public void destroy(){logger.info("开始关闭消息消费者服务...");consumer.shutdown();logger.info("消息消费者服务已关闭.");}public DefaultMQPushConsumer getConsumer() {return consumer;}}

同消息生产者类似,消息消费者是把生产者 DefaultMQPushConsumer 对象的生命周期分成构造函数、init、destroy 三个方法,具体含义在介绍 Java 访问 RocketMQ 实例时已经介绍过了,不再赘述。当然,有了消费者对象还需要消息监听器在接收到消息后执行具体的处理逻辑。

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.List;public class MessageListener implements MessageListenerConcurrently {private Logger logger = Logger.getLogger(getClass());public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (list != null) {for (MessageExt ext : list) {try {logger.info("监听到消息 : " + new String(ext.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}

消息监听器类就是把前面 Java 示例中注册消息监听器时声明的匿名内部类代码抽取出来定义成单独一个类而已。

1.3 Spring 配置文件

因为只使用 Spring 框架集成,所以除了 Sping 框架核心 jar 包外不需要额外添加依赖包了。本例中将消息生产者和消息消费者分成两个配置文件,这样能更好的演示收发消息的效果。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsd"><bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy"><constructor-arg name="nameServerAddr" value="localhost:9876"/><constructor-arg name="producerGroupName" value="spring_producer_group"/></bean>
</beans>

消息生产者配置很简单,定义了一个消息生产者对象,该对象初始化时调用 init 方法,对象销毁前执行 destroy 方法,将 Name Server 地址和生产者组配置好。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsd"><bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" /><bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy"><constructor-arg name="nameServerAddr" value="localhost:9876"/><constructor-arg name="consumerGroupName" value="spring_consumer_group"/><constructor-arg name="topicName" value="spring-rocketMQ-topic" /><constructor-arg name="messageListener" ref="messageListener" /></bean></beans>

消息消费者同消息生产者配置类似,多了一个消息监听器对象的定义和绑定。

1.4 运行实例程序

按前述步骤 启动 Name Server 和 Broker,接着运行消息生产者和消息消费者程序,简化起见我们用两个单元测试类模拟这两个程序:

package org.study.mq.rocketMQ.spring;import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringProducerTest {private ApplicationContext container;@Beforepublic void setup() {container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");}@Testpublic void sendMessage() throws Exception {SpringProducer producer = container.getBean(SpringProducer.class);for (int i = 0; i < 20; i++) {//创建一条消息对象,指定其主题、标签和消息内容Message msg = new Message("spring-rocketMQ-topic",null,("Spring RocketMQ demo " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */);//发送消息并返回结果SendResult sendResult = producer.getProducer().send(msg);System.out.printf("%s%n", sendResult);}}
}

SpringProducerTest 类模拟消息生产者发送消息。

package org.study.mq.rocketMQ.spring;import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringConsumerTest {private ApplicationContext container;@Beforepublic void setup() {container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");}@Testpublic void consume() throws Exception {SpringConsumer consumer = container.getBean(SpringConsumer.class);Thread.sleep(200 * 1000);consumer.destroy();}
}

SpringConsumerTest 类模拟消息消费者者接收消息,在 consume 方法返回之前需要让当前线程睡眠一段时间,使消费者程序继续存活才能监听到生产者发送的消息。

分别运行 SpringProducerTest 类 和 SpringConsumerTest 类,在 SpringConsumerTest 的控制台能看到接收的消息:
在这里插入图片描述
假如启动两个 SpringConsumerTest 类进程,因为它们属于同一消费者组,在 SpringConsumerTest 的控制台能看到它们均摊到了消息:
在这里插入图片描述
在这里插入图片描述

二、参考链接

[01] 消息队列之 RocketMQ

相关文章:

549、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ(三)】 2023.02.28

目录一、Spring 整合 RocketMQ1.1 消息生产者1.2 消息消费者1.3 Spring 配置文件1.4 运行实例程序二、参考链接一、Spring 整合 RocketMQ 不同于 RabbitMQ、ActiveMQ、Kafka 等消息中间件&#xff0c;Spring 社区已经通过多种方式提供了对这些中间件产品集成&#xff0c;例如通…...

如何使用SpringBoot ⽇志?

Spring Boot自定义日志的打印:在一个类中先获取到打印日志对象&#xff08;日志框架提供的日志对象&#xff0c;而日志框架默认已经集成到Spring Boot里了&#xff0c;springboot默认使用 slf4jlogback);注意&#xff1a;得到日志对象Logger ->来自于slf4j2、使用目志对象提…...

山东大学数字图像处理实验:MATLAB的图像显示方法

文章目录MATLAB 学习实验目的实验原理及方法实验内容MATLAB的图像显示方法实验目的实验内容MATLAB 学习 实验目的 了解 MATLAB 的基本功能及操作方法。掌握典型离散信号的 Matlab 产生和显示。 实验原理及方法 在 MATLAB 中, 序列是用矩阵向量表示, 但它没有包含采样信息, …...

Java缓存面试题——Redis解决方案

文章目录1、什么是缓存击穿&#xff1f;该如何解决2、什么是缓存穿透&#xff1f;该如何解决3、什么是缓存雪崩&#xff1f;该如何解决4、什么是BigKey&#xff1f;该如何解决bigkey的危害发现bigkey解决bigkey5、redis过期策略都有哪些&#xff1f;6、讲一讲Redis缓存的数据一…...

Flink:The generic type parameters of ‘Collector‘ are missing 类型擦除

类型擦除问题处理报错日志描述问题描述报错解决其他方法方法一&#xff1a;TypeInformation方法二&#xff1a;TypeHint报错日志描述 报错日志&#xff1a; The generic type parameters of Collector are missing. In many cases lambda methods dont provide enough informa…...

MySQL查询操作

系列文章目录前言一、简单查询SELECT子句SELECT后面之间跟列名DISTINCT,ALL列表达式列更名WHERE子句WHERE子句中可以使用的查询条件比较运算BETWEEN...AND...集合查询&#xff1a;IN模糊查询LIKE空值比较&#xff1a;IS NULL多重条件查询SELECT 的基本结构ORDER BY子句排序聚集…...

Redis-day01-note

Redis-day01-note 文章目录**Redis-day01-note****安装****配置文件详解****数据类型****字符串类型(string)**列表数据类型&#xff08;List&#xff09;****与python交互**Redis介绍特点及优点 1、开源的&#xff0c;使用C编写&#xff0c;基于内存且支持持久化 2、高性能的…...

嵌入式C基础知识(19)

时序在前面我们说到当处理器要向外设芯片写数据时&#xff0c;需要先将所需访问的外设的地址放在地址总线上&#xff0c;然后&#xff0c;由译码器将地址总线上的数据转换成片选信号&#xff0c;片选信号则使能目标外设芯片&#xff0c;接下来处理器写数据到数据总线上&#xf…...

java 2(程序流程控制)【含例题详解】

java ——程序流程控制 ✍作者&#xff1a;电子科大不知名程序员 &#x1f332;专栏&#xff1a;java学习指导 各位读者如果觉得博主写的不错&#xff0c;请诸位多多支持&#xff1b;如果有错误的地方&#xff0c;欢迎在评论区指出 目录java ——程序流程控制分支结构if-elsesw…...

基于Conda完成创建多版本python环境

文章目录基于Conda完成创建多版本python环境基于Conda完成创建多版本python环境 通过cmd打开conda环境 d:\ProgramData\Anaconda3\Scripts\activate创建python3.7的环境 conda create -n py3.7 python3.7产生错误 Collecting package metadata (repodata.json): failed Unav…...

35岁的测试被裁,公司地位还不如00后...

国内的互联网行业发展较快&#xff0c;所以造成了技术研发类员工工作强度比较大&#xff0c;同时技术的快速更新又需要员工不断的学习新的技术。因此淘汰率也比较高&#xff0c;超过35岁的基层研发类员工&#xff0c;往往因为家庭原因、身体原因&#xff0c;比较难以跟得上工作…...

vue H5跳转小程序报错:config:fail,Error: 系统错误,错误码:63002,invalid signature

微信开发者工具下载地址与更新日志 错误码&#xff1a;63002,invalid signature 无效的签名 附录5 微信网页开发 /JS-SDK说明文档 微信 JS 接口签名校验工具 全局返回码说明 ​ 排查步骤 确认签名算法正确&#xff0c;可用 http://mp.weixin.qq.com/debug/cgi-bin/sand…...

来面试阿里测开工程师,HR问我未来3-5年规划,我给HR画个大饼。

在面试的过程中是不是经常被面试官问未来几年的职业规划?你会答吗&#xff1f;是不是经常脑袋里一片空白&#xff0c;未来规划&#xff1f;我只是想赚更多的钱啊&#xff0c;哈哈哈&#xff0c;今天我来教大家&#xff0c;如何给面试官画一个大饼&#xff0c;让他吃的不亦乐乎…...

【2373. 矩阵中的局部最大值】

来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 给你一个大小为 n x n 的整数矩阵 grid 。 生成一个大小为 (n - 2) x (n - 2) 的整数矩阵 maxLocal &#xff0c;并满足&#xff1a; maxLocal[i][j] 等于 grid 中以 i 1 行和 j 1 列为中心的 3 …...

Read book Netty in action(Chapter VII)--ChannelHandler和ChannelPipeline

序言 我们曾经学过了ByteBuf – netty的数据容器&#xff0c;还有ChannelHandler和ChannelPipeline&#xff0c;这一把将他们组合起来&#xff0c;这些组件的交互正是Netty的灵魂所在&#xff01; ChannelHanlder家族 在详细地学习ChannelHanlder之前&#xff0c;我们将在Ne…...

react的严格模式 和 解决react useEffect执行两次

useEffect执行两次 这个问题&#xff0c;主要是刚接触react的时候发的问题&#xff0c;当时也没总结。现在回过头来再总结一次&#xff01;&#xff01;&#xff01; 文章目录useEffect执行两次前言一、为什么useEffect执行两次1.React的严格模式&#xff08;模版创建项目&…...

C++中的STL

一、概念 STL&#xff0c;英文全称 standard template library&#xff0c;中文可译为标准模板库或者泛型库&#xff0c;其包含有大量的模板类和模板函数&#xff0c;是 C 提供的一个基础模板的集合&#xff0c;用于完成诸如输入/输出、数学计算等功能。 STL 最初由惠普实验室…...

【沐风老师】3dmax一键窗户生成器插件使用方法详解

3dmax一键窗户生成器插件教程 3dMax一键窗户生成器是一个在3dMax中自动创建3D窗户模型的脚本。它有28种风格的窗户样式&#xff0c;可以在Archviz项目中灵活应用&#xff0c;同时为3D艺术家节省大量时间。 【适用版本】 适用3dMax 2018.2及更高版本 【安装方法】 1.解压缩包&…...

【图像处理】数字图像处理基础(分辨率,像素,显示...)

Table of Contents1.数字图像处理基础1.1 图像表示1.1.1 图像成像模型1.1.2 数字图像的表示a.图像采样b.图像灰度的量化c.算比特数1.2 分辨率1.2.1 空间分辨率1.2.2 灰度分辨率1.3 像素间的关系1.3.1 像素邻域a.4邻域b.4对角邻域c.8邻域1.3.2 像素邻接1.3.3 像素连通1.3.4 像素…...

UE实现相机飞行效果CesiumForUnreal之DynamicPawn飞行原理浅析

文章目录 1.实现目标2.实现过程2.1 FlyTo实现原理与代码2.2 DynamicPawn飞行原理3.参考资料1.实现目标 基于CesiumForUnreal的Dynamic Pawn实现飞行效果GIF动图: 2.实现过程 实现原理较为简单,基于CesiumForUnreal插件中DynamicPawn中的Camera实现相关功能。其中FlyTo直接通…...

多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度​

一、引言&#xff1a;多云环境的技术复杂性本质​​ 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时&#xff0c;​​基础设施的技术债呈现指数级积累​​。网络连接、身份认证、成本管理这三大核心挑战相互嵌套&#xff1a;跨云网络构建数据…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

微信小程序 - 手机震动

一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注&#xff1a;文档 https://developers.weixin.qq…...

【HTML-16】深入理解HTML中的块元素与行内元素

HTML元素根据其显示特性可以分为两大类&#xff1a;块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...

今日科技热点速览

&#x1f525; 今日科技热点速览 &#x1f3ae; 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售&#xff0c;主打更强图形性能与沉浸式体验&#xff0c;支持多模态交互&#xff0c;受到全球玩家热捧 。 &#x1f916; 人工智能持续突破 DeepSeek-R1&…...

docker 部署发现spring.profiles.active 问题

报错&#xff1a; org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...

CVE-2020-17519源码分析与漏洞复现(Flink 任意文件读取)

漏洞概览 漏洞名称&#xff1a;Apache Flink REST API 任意文件读取漏洞CVE编号&#xff1a;CVE-2020-17519CVSS评分&#xff1a;7.5影响版本&#xff1a;Apache Flink 1.11.0、1.11.1、1.11.2修复版本&#xff1a;≥ 1.11.3 或 ≥ 1.12.0漏洞类型&#xff1a;路径遍历&#x…...

Java求职者面试指南:计算机基础与源码原理深度解析

Java求职者面试指南&#xff1a;计算机基础与源码原理深度解析 第一轮提问&#xff1a;基础概念问题 1. 请解释什么是进程和线程的区别&#xff1f; 面试官&#xff1a;进程是程序的一次执行过程&#xff0c;是系统进行资源分配和调度的基本单位&#xff1b;而线程是进程中的…...

Webpack性能优化:构建速度与体积优化策略

一、构建速度优化 1、​​升级Webpack和Node.js​​ ​​优化效果​​&#xff1a;Webpack 4比Webpack 3构建时间降低60%-98%。​​原因​​&#xff1a; V8引擎优化&#xff08;for of替代forEach、Map/Set替代Object&#xff09;。默认使用更快的md4哈希算法。AST直接从Loa…...

论文阅读:LLM4Drive: A Survey of Large Language Models for Autonomous Driving

地址&#xff1a;LLM4Drive: A Survey of Large Language Models for Autonomous Driving 摘要翻译 自动驾驶技术作为推动交通和城市出行变革的催化剂&#xff0c;正从基于规则的系统向数据驱动策略转变。传统的模块化系统受限于级联模块间的累积误差和缺乏灵活性的预设规则。…...