20241130 RocketMQ本机安装与SpringBoot整合
目录
一、RocketMQ简介
???1.1、核心概念
???1.2、应用场景
???1.3、架构设计
2、RocketMQ Server安装
3、RocketMQ可视化控制台安装与使用
4、SpringBoot整合RocketMQ实现消息发送和接收?
? ? ? ? 4.1、添加maven依赖
???4.2、yaml配置
???4.3、生产者
???4.4、消费者
???4.5、接口
???4.6、接口测试
一、RocketMQ****简介
RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历 了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
官方文档:https://rocketmq.apache.org/docs/quick-start/
github中文主页:https://github.com/apache/rocketmq/tree/master/docs/cn
1.1、核心概念
- Topic:消息主题,一级消息类型,生产者向其发送消息。Message:生产者向Topic发送并最终传送给消费者的数据消息的载体。
- 消息属性:生产者可以为消息定义的属性,包含Message Key和Tag。
- Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。
- Message ID:消息的全局唯一标识,由消息队列RocketMQ系统自动生成,唯一标识某条消息。
- Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类
- Producer:也称为消息发布者,负责生产并发送消息至Topic。
- Consumer:也称为消息订阅者,负责从Topic接收并消费消息。
- 分区:即Topic Partition,物理上的概念。每个Topic包含一个或多个分区。
- 消费位点:每个Topic会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset;分区的起始位置对应的位置叫做起始位点MinOffset。
- Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订 阅的逻辑一致。
- Group ID:Group的标识。
- 队列:单个Topic下会由一到多个队列来存储消息。
- Exactly-Once****投递语义:Exactly-Once投递语义是指发送到消息系统的消息只能被Consumer处理且仅处理一次,即使Producer重试消息发送导致某消息重复投递,该消息在Consumer也只被消费 一次。
- 集群消费:一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条 消息。
- 广播消费:一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。
- 定时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
- 延时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
- 事务消息:RocketMQ提供类似X/Open XA的分布事务功能,通过消息队列RocketMQ的事务消息能达到分布式事务的最终一致。
- 顺序消息:RocketMQ提供的一种按照顺序进行发布和消费的消息类型,分为全局顺序消息和分区顺序消息。
- 全局顺序消息:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
- 分区顺序消息:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Message Key是完全不同的概念。
- 消息堆积:Producer已经将消息发送到消息队列RocketMQ的服务端,但由于Consumer消费能力有限,未能在短时间内将所有消息正确消费掉,此时在消息队列RocketMQ的服务端保存着未被消费的消息,该状态即消息堆积。
- 消息过滤:Consumer可以根据消息标签(Tag)对消息进行过滤,确保Consumer最终只接收被过滤后的消息类型。消息过滤在消息队列RocketMQ的服务端完成。
- 消息轨迹:在一条消息从Producer发出到Consumer消费处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从Producer发出,经由消息队列RocketMQ服务端,投递给Consumer的完整链路,方便定位排查问题。
- 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅的Topic的消费进度,设置完成后Consumer将接收设定时间点之后由Producer发送到消息队列RocketMQ服务端的消息。
- 死信队列:死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列RocketMQ会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明Consumer在正常情况下无法正确地消费该消息。此时,消息队列RocketMQ不会立刻将消息丢弃,而是将这条消息发送到该Consumer对应的特殊队列中。消息队RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
1.2、应用场景
- 削峰填谷:诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ可提供削峰填谷的服务来解决该问题。
- 异步解耦:交易系统作为淘宝和天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列RocketMQ可实现异步通信和应用解耦,确保主站业务的连续性。
- 顺序收发:细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列RocketMQ提供的顺序消息即保证消息FIFO。
- 分布式事务一致性:交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列RocketMQ的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
- 大数据分析:数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列RocketMQ与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。
- 分布式缓存同步:天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列RocketMQ构建分布式缓存,实时通知商品数据的变化。
1.3、架构设计


RocketMQ架构上主要分为四部分,如上图所示:
**Producer:**消息发布的角色,支持分布式集群方式部署。 Producer 通过 MQ 的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
**Consumer:**消息消费的角色,支持分布式集群方式部署。支持以 push 推, pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
NameServer: NameServer 是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的zookeeper,支持 Broker 的动态注册与发现。主要包括两个功能: Broker 管理, NameServer 接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker仍然可以向其它 NameServer 同步其路由信息, Producer,Consumer仍然可以动态感知
Broker的路由的信息。BrokerServer: Broker 主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
**1. Remoting Module:**整个Broker 的实体,负责处理来自 clients端的请求。
**2. Client Manager:**负责管理客户端(Producer/Consumer) 和维护 Consumer 的 Topic订阅信息
**3. Store Service:**提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
4. HA Service: 高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
5. Index Service: 根据特定的Message key 对投递到 Broker的消息进行索引服务,以提供消息的快速查询。
2、RocketMQ Server****安装
RocketMQ依赖Java环境,要求有JDK 1.8以上版本;
支持Windows和Linux平台;支持源码方式安装和使用已经编译好的安装包安装;
我们用windows平台安装RocketMQ Server编译好的安装包,来讲解RocketMQ;
下载地址:https://rocketmq.apache.org/dowloading/releases/
我们选择4.9.0二进制版本;

解压后的目录:

benchmark:里面是测试Demo;
bin:可执行脚本;conf:配置文件;
lib:依赖的jar包;
我们把rocketmq解压包放到D盘soft目录下,重命名 rocketmq ;
第一步:系统环境变量加两个配置
ROCKETMQ_HOME=“D:soft ocketmq”
NAMESRV_ADDR=“localhost:9876”


第二步:启动 Name Server
进入命令行执行:
.inmqnamesrv.cmd

第三步:启动 Broker
进入命令行执行:
.inmqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

第四步:发送和接收消息测试
进入命令行消息发送执行:
.in ools.cmd org.apache.rocketmq.example.quickstart.Producer

消息发送成功;
进入命令行消息接收执行:
.in ools.cmd org.apache.rocketmq.example.quickstart.Consumer

消息接收成功;
第五步:关闭服务
windows 下直接关闭命令行窗口即可;
3、RocketMQ****可视化控制台安装与使用
RocketMQ 提供了一些扩展项目支持,地址: https://github.com/apache/rocketmq-externals
其中一个 rocketmq - connect - console 项目,就是我们需要的可视化控制台;

我们把整个项目下载下来,打开 rocketmq - console 项目;项目是 SpringBoot 开发;

打开 application.properties 配置文件,我们至少需要修改两个配置项;
server.port=8080 ,这个是可视化项目启动端口,我们改成 19876 ;
rocketmq.config.namesrvAddr= ,这个是指定 nameServer 地址和端口,我们暂时先搞成localhost:9876,等后面搞集群的话,要再修改;

修改后保存,在项目目录下进入命令行,执行:
D:soft ocketmqClient ocketmq-externals-master ocketmq-console
mvn clean package -Dmaven.test.skip=true

打包执行完后,在 target 目录,会生成一个可运行 jar rocketmq - console - ng - 2.0.0.jar

我们运行这个 jar ,进入命令行执行:
java -jar rocketmq-console-ng-2.0.0.jar

启动成功后,浏览器输入:http://localhost:19876/

说明一切OK;
4、SpringBoot整合RocketMQ****实现消息发送和接收
4.1、添加maven依赖
org.apache.rocketmq rocketmq-spring-boot-starter 2.2.3
4.2、yaml配置
rocketmq:
name-server: localhost:9876
producer:
# 发送同一类消息的设置为同一个group,保证唯一
group: producer-group
# 发送消息超时时间,默认3000
sendMessageTimeout: 5000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 异步消息重试此处,默认2
retryTimesWhenSendAsyncFailed: 2
# 消息最大长度,默认1024 * 1024 * 4(默认4M)
maxMessageSize: 4096
# 压缩消息阈值,默认4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在内部发送失败时重试另一个broker,默认false
retryNextServer: false
consumer:
group: consumer-group # 消费者组名

4.3、生产者
@Service
public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic, String message) {rocketMQTemplate.convertAndSend(topic, message);System.out.println("Message sent: " + message);}
}
4.4、消费者
@Service
@RocketMQMessageListener(topic = "your-topic", consumerGroup = "consumer-group")
public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message: " + message);}
}
4.5、接口
@GetMapping("/send")public String sendMessage(@RequestParam String topic, @RequestParam String message) {rocketMQProducer.sendMessage(topic, message);return "Message sent: " + message;}
4.6、接口测试


至此测试完成!
相关文章:
20241130 RocketMQ本机安装与SpringBoot整合
目录 一、RocketMQ简介 ???1.1、核心概念 ???1.2、应用场景 ???1.3、架构设计 2、RocketMQ Server安装 3、RocketMQ可视化控制台安装与使用 4、SpringBoot整合RocketMQ实现消息发送和接收? ? ? ? ? 4.1、添加maven依赖 ???4.2、yaml配置 ???4.3、…...
FFmpeg进化论:从av_register_all手动注册到编译期自动加载的技术跃迁
介绍 音视频开发都知道 FFmpeg,因此对 av_register_all 这个 API 都很熟悉,但ffmpeg 4.0 版本开始就已经废弃了,是旧版本中用于全局初始化的重要接口。 基本功能 核心作用:av_register_all() 用于注册所有封装器(muxer)、解封装器(demuxer)和协议处理器(protocol),…...
Http升级为Https - 开发/测试服环境
1.应用场景 主要用于开发/测试服环境将http升级为https, 防止前端web(浏览器)出现Mixed Content报错; 2.学习/操作 1.文档阅读 deepseek 问答; 2.整理输出 报错信息: Mixed Content: The page at <URL> was loaded over HTTPS, but requested an insecure XMLHttpRequ…...
C语言预编译
大家好,这里是小编的博客频道 小编的博客:就爱学编程 很高兴在CSDN这个大家庭与大家相识,希望能在这里与大家共同进步,共同收获更好的自己!!! 本文目录 引言正文一、预处理的作用与流程…...
算法刷题-字符串-151.反转单词
题目 给一串字符串,里面有若干单词,以空格界定单词的结束,翻转其中的单词 输入:s " hello world " 输出:“world hello” 需要注意的是,给定的字符串可能存在头空格、尾空格以及中间的空格数量…...
单片机裸机编程:状态机与其他高效编程框架
在单片机裸机编程中,状态机是一种非常强大的工具,能够有效管理复杂的逻辑和任务切换。除了状态机,还有其他几种编程模式可以在不使用 RTOS 的情况下实现高效的程序设计。以下是一些常见的方法: 1. 状态机编程 状态机通过定义系统…...
图表控件Aspose.Diagram入门教程:使用 Python 将 VSDX 转换为 PDF
将VSDX转换为PDF可让用户轻松共享图表。PDF 文件保留原始文档的布局和设计。它们广泛用于演示文稿、报告和文档。在这篇博文中,我们将探讨如何在 Python 中将 VSDX 转换为 PDF。 本文涵盖以下主题: Python VSDX 到 PDF 转换器库使用 Python 将 VSDX 转…...
DPVS-1:编译安装DPVS (ubuntu22.04)
操作系统 rootubuntu22:~# lsb_release -a No LSB modules are available. Distributor ID: Ubuntu Description: Ubuntu 22.04.3 LTS Release: 22.04 Codename: jammy rootubuntu22:~# 前置软件准备 apt install git apt install meson apt install gcc ap…...
即将发布书籍 - Yocto项目实战教程:高效定制嵌入式Linux系统
以下这本书《Yocto项目实战教程:高效定制嵌入式Linux系统》即将发布,现在请哪位大佬出山写一个序或者推荐,有兴趣的大佬,请联系我! Git仓库地址: https://github.com/jerrysundev/Yocto-Project-Book.git …...
Git 常用指令及其说明
配置相关 # 配置全局用户名 git config --global user.name "YourUsername"# 配置全局邮箱 git config --global user.email "your.emailexample.com"说明:这两条命令用于设置 Git 全局的用户名和邮箱,在提交代码时,这些…...
nginx代理后502
直接访问 https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions正常 使用nginx代理后访问出现502 server {listen 9999;server_name 172.21.3.78;location ^~ /compatible-mode {proxy_pass https://dashscope.aliyuncs.com;}location / {proxy_pass…...
大模型WebUI:Gradio全解12——LangChain原理及其agent构建Gradio(1)
大模型WebUI:Gradio全解12——LangChain原理及其agent构建Gradio(1) 前言本篇摘要12. LangChain原理及其agent构建Gradio12.1 LangChain概念及优势分析12.1.1 概念12.1.2 标准化组件接口1. 示例:聊天模型2. 示例:检索器12.1.3 编排组件12.1.4 便于部署12.1.5 可观测性和评…...
【Unity】鱼群效果模拟
鱼群效果模拟 文章目录 鱼群效果模拟Boid算法实现方式version1_CPUversion2_GPUversion3_Multilaterationversion4_Bitonic_Sorting (GPU友好)version5_Skinning (TODO) 细节项优化项参考链接 Boid算法 Boid算法是一种模拟群体行…...
PHP入门基础学习五(函数1)
函数 一、概念 1、什么是函数? 函数:封装一段用于完成特定功能的代码 当使用一个函数时,只需关心函数的参数和返回值,就可以完成一个特定的功能 2、php中的函数 PHP 的真正威力源自于它的函数,PHP 中提供了超过 1000 个内建的函数。 php函数分为: 系统内部函数和自…...
微信小程序 - 页面跳转(wx.navigateTo、wx.redirectTo、wx.switchTab、wx.reLaunch)
API 跳转 1、wx.navigateTo (1)基本介绍 功能:保留当前页面,跳转到应用内的某个页面,使用该方法跳转后可以通过返回按钮返回到原页面 使用场景:适用于需要保留当前页面状态,后续还需返回的情…...
Typora的Github主题美化
[!note] Typora的Github主题进行一些自己喜欢的修改,主要包括:字体、代码块、表格样式 美化前: 美化后: 一、字体更换 之前便看上了「中文网字计划」的「朱雀仿宋」字体,于是一直想更换字体,奈何自己拖延症…...
2.3 变量
版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 变量是用来存放某个值的数据,它可以表示一个数字、一个字符串、一个结构、一个类等。变量包含名称、类型和值。在代码中…...
Docker:Docker从入门到精通(一)- Docker简介
一、前言 通过本专栏的学习,我们将了解 1. 掌握Docker基础知识,能够理解Docker镜像与容器的概念 2. 完成Docker安装与启动 3. 掌握Docker镜像与容器相关命令 4. 掌握Tomcat Nginx 等软件的常用应用的安装 5. 掌握docker迁移与备份相…...
【复习】Redis
数据结构 Redis常见的数据结构 String:缓存对象Hash:缓存对象、购物车List:消息队列Set:点赞、共同关注ZSet:排序 Zset底层? Zset底层的数据结构是由压缩链表或跳表实现的 如果有序集合的元素 < 12…...
在Spring Boot+Vue前后端分离的项目中使用JWT实现基本的权限校验
说明 在 Spring Boot + Vue 前后端分离的项目中,如果不使用第三方服务(如 Spring Security、Shiro 等),可以通过自定义实现基本的权限校验。 使用JWT实现步骤 以下是实现步骤: 1. 设计权限模型 通常权限模型包括: 用户(User):系统的使用者。角色(Role):用户的权…...
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...
macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用
文章目录 问题现象问题原因解决办法 问题现象 macOS启动台(Launchpad)多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显,都是Google家的办公全家桶。这些应用并不是通过独立安装的…...
【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验
系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...
大模型多显卡多服务器并行计算方法与实践指南
一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...
sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!
简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求,并检查收到的响应。它以以下模式之一…...
招商蛇口 | 执笔CID,启幕低密生活新境
作为中国城市生长的力量,招商蛇口以“美好生活承载者”为使命,深耕全球111座城市,以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子,招商蛇口始终与城市发展同频共振,以建筑诠释对土地与生活的…...
基于Java+VUE+MariaDB实现(Web)仿小米商城
仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意:运行前…...
探索Selenium:自动化测试的神奇钥匙
目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...
给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...
pycharm 设置环境出错
pycharm 设置环境出错 pycharm 新建项目,设置虚拟环境,出错 pycharm 出错 Cannot open Local Failed to start [powershell.exe, -NoExit, -ExecutionPolicy, Bypass, -File, C:\Program Files\JetBrains\PyCharm 2024.1.3\plugins\terminal\shell-int…...
