RocketMQ之(一)RocketMQ入门
一、RocketMQ入门
- 一、RocketMQ 介绍
- 1.1 RocketMQ 是什么?
- 1.2 RocketMQ 应用场景
- 01、应用解耦
- 02、流量削峰
- 03、数据分发
- 1.3 RocketMQ 核心组成
- 01、NameServer
- 02、Broker
- 03、Producer
- 04、Consumer
- 1.6 运转流程
- 1.5 RocketMQ 架构
- 01、NameServer 集群
- 02、Broker 集群
- 03、Producer 集群
- 04、Consumer 集群
- 07、集群工作流程
- 06、集群间的交互方式
- 1.6 RocketMQ 优缺点
- 01、优点
- 02、缺点
- 1.7 各种 MQ 比较
- 二、RocketMQ 安装(Linux 版本)
- 2.1 环境要求
- 2.2 安装步骤
- 01、上传安装包
- 02、解压安装包
- 03、参数配置
- 2.3 目录介绍
- 2.4 启动 RocketMQ
- 2.5 测试 RocketMQ
- 2.6 关闭 RocketMQ
- 三、rocketmq-console 集群监控平台搭建
- 3.1 简介
- 3.2 搭建集群监控平台
- 01、下载
- 02、上传解压
- 03、修改配置参数
- 04、打包
- 05、启动和访问
- 06、问题点
- 四、RocketMQ 发送消息基本样例
- 4.1 普通消息发送
- 4.2 普通消息消费
一、RocketMQ 介绍
1.1 RocketMQ 是什么?
RocketMQ 是一款纯 java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
1.2 RocketMQ 应用场景
消息队列是一种"先进先出"的数据结构,其应用场景主要包含以下三个方面:
01、应用解耦
系统的耦合性越高,容错性就越低。
以电商应用为例:用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

使用消息队列解耦,系统的耦合性就会提高了。比如:如果物流系统发生故障,需要几分钟才能修复好,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

02、流量削峰
应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列后,可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提升系统的稳定性和用户体验。

一般情况下,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验。而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样的体验应该要好很多。

处于经济考量的目的:
业务系统正常时段的 QPS 如果是 1000,流量最高峰是 10000,为了应对流量高峰配置高性能的服务器显然不划算,这时就可以考虑使用消息队列对峰值流量削峰。
03、数据分发
通过消息队列可以让数据在多个系统之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。


1.3 RocketMQ 核心组成
RocketMQ 主要有四大核心组成部分:NameServer、Broker、Producer、Consumer。

Topic:区分消息的种类。一个发送者可以发送消息给一个或多个 Topic,一个消息的接收者可以订阅一个或多个 Topic 消息。
Message Queue:相当于是 Topic 的分区,用于并行发送和接收消息。
01、NameServer
NameServer 是一个几乎无状态节点,可集群部署,节点之间没有任何信息同步。所以 RocketMQ 需要先启动 NameServer 再启动 Broker。
-
作用
NameServer 是整个 RocketMQ 的 "大脑",它相当于是服务注册中心的角色,用来管理 Broker。举例:各个邮局的管理机构。每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息,生产者和消费者通过 NameServer 去获取整个 Broker 集群的路由信息,从而进行消息的投递和消费。
每个 Broker 在启动的时候会到 NameServer 中注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,进而和 Broker 取得连接。Consumer 也会定时获取 Topic 的路由信息。所以从功能上看应该是和 ZooKeeper 差不多的,但是据说 RocketMQ 的早期版本确实是使用的ZooKeeper ,后来改为了自己实现 NameServer。
-
与 ZooKeeper 的区别
NameServer 和 ZooKeeper 的作用大致是相同的。从宏观上来看,NameServer 做的东西很少,就是保存一些运行数据,NameServer 之间不互相连,这就需要 Broker 端连接所有的 NameServer,运行数据的改动要发送到每一个 NameServer ,从而来保证运行数据的一致性(这个一致性确实有点弱),这样就变成了 NameServer很轻量级,但是 Broker 端就要做更多的东西了。
但是在 ZooKeeper 中,Broker 只需要连接其中的一台机器,运行数据分发、一致性都交给了 ZooKeeper 来完成。
-
高可用保障
Broker 在启动时向所有 NameServer 注册(主要是服务器地址等) ,生产者在发送消息之前先从NameServer 获取 Broker 服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。
NameServer 与每台 Broker 服务保持长连接,并间隔 30S 检查 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除,这样就可以实现 RocketMQ 的高可用。
02、Broker
Broker 是消息存储中心,主要作用是接收来自 Producer 的消息并存储,Consumer 从这里取得消息。举例:邮局。
它还存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等。从部署结构图中可以看出 Broker 有 Master 和 Slave 两种类型,Master 既可以写又可以读,Slave 只可以读不可以写。
从物理结构上看 Broker 的集群部署方式有四种:单 Master 、多 Master 、多 Master 多 Slave(同步刷盘)、多 Master 多 Slave(异步刷盘)。
-
单 Master
这种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
-
多 Master
所有消息都是 Master,没有 Slave。这种方式优缺点:
- 优点:配置简单,单个 Master 宕机或重启维护对应用无影响。在磁盘配置为 RAID10 时,即使机器宕机不可恢复的情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
-
多 Master 多 Slave(异步复制)
异步:先响应后再存入磁盘。每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备之间有毫秒级消息延迟。这种方式优缺点:
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受到影响。同时 Master 宕机后,消费者仍然可以从 Salve 消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
- 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
-
多 Master 多 Slave(同步双写)
同步:立刻存入磁盘后响应。每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功。这种模式的优缺点:
- 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
- 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的 RT 会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
-
高可用保障
每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时(每隔30s)注册 Topic 信息到所有 NameServer。NameServer定时(每隔10s)扫描所有存活 Broker 的连接,如果 NameServer 超过2分钟没有收到心跳,则 NameServer 断开与 Broker 的连接。
03、Producer
Producer 也称为消息发布者,负责生产并发送消息至 Topic。举例:发信者。生产者向 brokers 发送由业务应用程序系统生成的消息。RocketMQ 提供了发送:同步、异步和单向(one-way)的多种范例。
-
同步发送
同步发送消息指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,比如重要通知邮件、营销短信等。
-
异步发送
异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。假如过一段时间检测到某个信息发送失败,可以选择重新发送。
-
单向发送
单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。
-
生产者组
生产者组(Producer Group)是一类 Producer 的集合,这类 Producer 通常发送一类消息并且发送逻辑一致,所以将这些 Producer 分组在一起。从部署结构上看生产者通过 Producer Group 的名字来标记自己是一个集群。
-
高可用保障
Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。
Producer 每隔30s(由 ClientConfig 的 pollNameServerInterval )从 Nameserver 获取所有 topic 队列的最新情况,这意味着如果 Broker 不可用,Producer 最多30s能够感知,在此期间内发往 Broker 的所有消息都会失败。
Producer 每隔30s(由 ClientConfig 中 heartbeatBrokerInterval 决定)向所有关联的 broker 发送心跳,Broker 每隔10s扫描所有存活的连接,如果 Broker 在2分钟内没有收到心跳数据,则关闭与 Producer 的连接。
04、Consumer
Consumer 也称为消息订阅者,负责从 Topic 接收并消费消息。举例:收信者。消费者从 brokers 里拉取信息并将其输入应用程序中。
-
消费者组
消费者组(Consumer Group)是一类 Consumer 的集合名称,这类 Consumer 通常消费同一类消息并且消费逻辑一致,所以将这些 Consumer 分组在一起。消费者组与生产者组类似,都是将相同角色的分组在一起并命名。
RocketMQ 中的消息有个特点:同一条消息,只能被某一消费组其中的一台机器消费,但是可以同时被不同的消费组消费。
-
高可用保障
Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
Consumer 每隔30s从 Nameserver 获取 topic 的最新队列情况,这意味着 Broker 不可用时,Consumer 最多最需要30s才能感知。
Consumer 每隔30s(由 ClientConfig 中 heartbeatBrokerInterval 决定)向所有关联的 broker 发送心跳,Broker 每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该 Consumer Group 的所有 Consumer 发出通知,Group 内的 Consumer 重新分配队列,然后继续消费。
当 Consumer 得到 master 宕机通知后,转向 slave 消费,slave 不能保证 master 的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦 master 恢复,未同步过去的消息会被最终消费掉。
1.6 运转流程

- NameServer 先启动;
- Broker 启动时向 NameServer 注册;
- 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台 Broker 进行消息发送;
- NameServer 与每台 Broker 服务器保持长连接,并间隔 30S 检测 Broker 是否存活,如果检测到 Broker 宕机(使用心跳机制, 如果检测超120S),则从路由注册表中将其移除;
- 消费者在订阅某个主题的消息之前从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),但是消费者选择从 Broker 中 订阅消息,订阅规则由 Broker 配置决定。
1.5 RocketMQ 架构

RocketMQ 架构图中展示了四个集群:
01、NameServer 集群
提供轻量级的服务发现及路由,每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速存储扩展。
NameServer 是一个功能齐全的服务器,主要包含两个功能:
- Broker 管理,接收来自 Broker 集群的注册请求,提供心跳机制检测 Broker 是否存活;
- 路由管理,每个 NameServer 持有全部有关 Broker 集群和客户端请求队列的路由信息。
02、Broker 集群
通过提供轻量级的 Topic 和 Queue 机制处理消息存储。同时支持推(Push)和拉(Pull)两种模型,包含容错机制。提供强大的峰值填充和以原始时间顺序累积数千亿条消息的能力。此外还提供灾难恢复,丰富的指标统计数据和警报机制,这些都是传统的消息系统缺乏的。
Broker 有几个重要的子模块:
- 远程处理模块,Broker 入口,处理来自客户端的请求;
- 客户端管理(包括消息生产者和消费者),维护消费者的主题订阅;
- 存储服务,提供在物理硬盘上存储和查询消息的简单 API;
- HA 服务,提供主从 Broker 间数据同步;
- 索引服务,通过指定键为消息建立索引并提供快速消息查询。
03、Producer 集群
消息生产者支持分布式部署,分布式生产者通过多种负载均衡模式向 Broker 集群发送消息。
04、Consumer 集群
消息消费者也支持 Push 和 Pull 模型的分布式部署,还支持集群消费和消息广播。提供了实时的消息订阅机制,可以满足大多数消费者的需求。
DefultMQPullConsumer :consumer 定时向 broker 发送请求获取内存数据,避免给 broker 造成巨大的压力。一般会在本地使用定时任务实现。
DefultMQPushConsumer :consumer 向 broker 发送请求,两者保持长链接的状态。broker 会定时(每 5 秒)去查询 consumer 中是否有要订阅的数据,有就将消息推送给 consumer。
无论是 pull 还是 push,其实本质上都是拉取消息。
07、集群工作流程
- 启动 NameServer,NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。
- Broker 启动后,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP + 端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 与 Broker 的映射关系。
- 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
- Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
- Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。
06、集群间的交互方式
- Broker Master 和 Broker Slave 是主从结构,会执行数据同步 Data Sync
每个 Broker 与 NameServer 集群中所有节点建立长连接,定时注册 Topic 信息到所有 NameServer; - Producer 与 NameServer 集群中的其中一个节点(随机)建立长连接,定期从 NameServer 获取 Topic 路由信息,并与提供 Topic 服务的 Broker Master 建立长连接,定时向 Broker 发送心跳;
- Producer 只能将消息发送到 Broker Master,但是 Consumer 同时和 Broker Master 和 Broker Slave 建立长连接,既可以从 Master 订阅消息,也可以从 Slave 订阅消息。
1.6 RocketMQ 优缺点
01、优点
- 单机吞吐量:十万级
- 可用性:非常高,分布式架构
- 消息可靠性:经过参数优化配置,消息可以做到 0 丢失
- 功能支持:MQ 功能较为完善,还是分布式的,扩展性好
- 支持 10 亿级别的消息堆积,不会因为堆积导致性能下降
- 源码是 Java,方便结合公司自己的业务进行二次开发
- 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
- RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验。
02、缺点
- 支持的客户端语言不多,目前仅支持 Java 及 C++,而且 C++ 还不成熟
- 没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
1.7 各种 MQ 比较

二、RocketMQ 安装(Linux 版本)
安装 RocketMQ 版本:4.5.0,我这里使用的是阿里云服务器,也可以在虚拟机上操作。
2.1 环境要求
- Linux 64 位操作系统
- JDK 1.8
- Maven 3.9.0(maven 版本不固定,但是最好不要使用最高版本)
2.2 安装步骤
01、上传安装包
将下载好的 RocketMQ 安装包上传到服务器上:

02、解压安装包
# 解压
unzip rocketmq-all-4.5.0-bin-release.zip# 将解压包移动到指定路径下
mv rocketmq-all-4.5.0-bin-release ../software

03、参数配置
这里需要配置三个文件:
-
/conf/broker.conf
指定 broker 的命名空间地址和当前 broker 监听的 IP:
默认情况下,namesrvAddr = 127.0.0.1:9876,brokerIP1 = 127.0.0.1。 -
/bin/runserver.sh
RocketMQ 默认的虚拟机内存比较大,启动 Broker 如果因为内存不足失败,就需要编辑这两个配置文件,修改 JVM 内存大小:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
-
/bin/runbroker.sh
同上,修改 JVM 内存大小:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
2.3 目录介绍
- bin:启动脚本,包括 shell 脚本和 cmd 脚本
- conf:实例配置文件,包括 broker 配置文件、logback 配置文件等
- lib:依赖 jar 包,包括 Netty、commons-lang、FastJSON 等
2.4 启动 RocketMQ
启动前先查看进程:

启动命令(在 bin 目录下执行):
# 启动 nameserver
nohup sh mqnamesrv -n 公网IP:9876 &# 启动 broker
nohup sh mqbroker -n 公网IP:9876 -c conf/broker.conf autoCreateTopicEnable=true &
启动后查看进程,有这两个进程即启动成功:

2.5 测试 RocketMQ
-
模拟生产者发送消息
# 生产者 sh tools.sh org.apache.rocketmq.example.quickstart.Producer输入这个命令后,控制台会输出很多的信息,不报错就说明发送成功了:

-
模拟消费者接收消息
# 消费者 sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
2.6 关闭 RocketMQ
# 关闭服务
sh mqshutdown namesrv
sh mqshutdown broker
三、rocketmq-console 集群监控平台搭建
3.1 简介
RocketMQ 有个可视化的管理界面,通过可视化界面,我们可以方便地监控 RocketMQ 集群,并实现很多操作。比如:创建管理 Topic,查看和发送 ,essage等等。
3.2 搭建集群监控平台
01、下载
RocketMQ提供了UI管理工具,名为rocketmq-console,项目地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console。
这个是 rocketmq 的扩展,里面不仅包含控制台的扩展,也包含对大数据 flume、hbase 等组件的对接和扩展。

02、上传解压
- 上传

- 解压
# 解压 unzip rocketmq-console.zip# 移动到 software 目录下 mv rocketmq-console ../software
03、修改配置参数
修改 rocketmq-console\src\main\resources\application.properties 配置文件:

04、打包
进入 rocketmq-console 目录下执行命令:
# 打包
mvn clean package ‐Dmaven.test.skip=true
打包完成后,会在 /software/rocketMQ/rocketmq-console 目录下生成一个 target 文件夹
05、启动和访问
-
启动
进入
/rocketmq-console/target目录下执行:# 指定端口号和命名空间地址 java -jar rocketmq-console-ng-1.0.1.jar --server.port=8088 --rocketmq.config.namesrvAddr=公网IP:9876
-
访问
http://106.15.0.30:8088

06、问题点
-
防火墙
防火墙需要开放访问 RocketMQ 的一系列端口号:
# 查看防火墙状态 systemctl status firewalld# 关闭防火墙 systemctl stop firewalld# 启动防火墙 systemctl start firewalld# 永久开放指定端口号【把用到的端口号都开放】 firewall-cmd --zone=public --add-port=10909/tcp --permanent firewall-cmd --zone=public --add-port=10911/tcp --permanent firewall-cmd --zone=public --add-port=9876/tcp --permanent firewall-cmd --zone=public --add-port=9877/tcp --permanent# 重新加载防火墙 firewall-cmd --reload# 或者重启防火墙 systemctl restart firewalld.service# 查看防火墙信息列表 firewall-cmd --list-all# 只查看防火墙开放端口号列表 firewall-cmd --list-ports除了这一层防火墙之外,阿里云服务器自己还有一层防火墙
iptables,是默认配置的,我们也需要关闭这层防火墙或者开放对应的端口号:# 查看防火墙状态出现的问题 service iptables status# 关闭防火墙 service iptables stop -
安全组
防火墙端口号开放之后,同时也需要在 ECS 服务器安全组中添加端口规则:

四、RocketMQ 发送消息基本样例
引入 jar 包:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.5.0</version>
</dependency>
4.1 普通消息发送
/*** 同步发送消息** @author qiaohaojie* @date 2023/2/20 17:00*/
public class SyncProducer {public static void main(String[] args) throws Exception {// 1. 实例化消息生产者 producerDefaultMQProducer producer = new DefaultMQProducer("group_producer_demo1");// 2. 设置 nameServer 的地址producer.setNamesrvAddr("公网IP:9876");// 关闭 VIP 通道
// producer.setVipChannelEnabled(false);producer.setSendMsgTimeout(3000);// 3. 启动 Producer 实例producer.start();for (int i = 0; i < 10; i++) {// 4. 创建消息 messageMessage message = new Message("Topic_Demo", "Tags", "Hello RocketMQ" + i, "hello".getBytes(RemotingHelper.DEFAULT_CHARSET));// 5. 发送消息SendResult result = producer.send(message);System.out.println(result);}// 6. 关闭 producerproducer.shutdown();}
}
这里的 message 实例中有几个参数:

- topic:代表消息的主题
- tags:主要用于消息过滤
- keys:消息的唯一值
- body:消息体,代表消息的内容

4.2 普通消息消费
/*** 同步发送消息-消费者** @author qiaohaojie* @date 2023/2/20 21:39*/
public class Consumer {public static void main(String[] args) throws Exception {// 1. 创建DefaultMQPushConsumer实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer_demo1");// 2. 设置nameServer地址consumer.setNamesrvAddr("公网IP:9876");// 设置消息拉取最大数consumer.setConsumeMessageBatchMaxSize(2);// 3. 设置subscribe,这里是要读取的主题信息consumer.subscribe("Topic_Demo", "*");// 4. 设置消息监听consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 5. 获取消息信息// 迭代消息信息for (MessageExt msg : msgs) {try {// 获取主题String topic = msg.getTopic();// 获取标签String tags = msg.getTags();// 获取信息String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);System.out.println("Consumer消费消息--topic:" + topic + ",targs:" + tags + ",result:" + result);} catch (UnsupportedEncodingException e) {e.printStackTrace();// 消息重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 6. 返回消息读取状态// 消息消费完成return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 开启Consumerconsumer.start();}
}
消费消息时有两个参数:

- topic:指定要消费的主题
- subExpression:消息过滤规则

相关文章:
RocketMQ之(一)RocketMQ入门
一、RocketMQ入门一、RocketMQ 介绍1.1 RocketMQ 是什么?1.2 RocketMQ 应用场景01、应用解耦02、流量削峰03、数据分发1.3 RocketMQ 核心组成01、NameServer02、Broker03、Producer04、Consumer1.6 运转流程1.5 RocketMQ 架构01、NameServer 集群02、Broker 集群03、…...
推荐系统[三]:粗排算法常用模型汇总(集合选择和精准预估),技术发展历史(向量內积,WideDeep等模型)以及前沿技术
1.前言:召回排序流程策略算法简介 推荐可分为以下四个流程,分别是召回、粗排、精排以及重排: 召回是源头,在某种意义上决定着整个推荐的天花板;粗排是初筛,一般不会上复杂模型;精排是整个推荐环节的重中之重,在特征和模型上都会做的比较复杂;重排,一般是做打散或满足…...
vue3 + vite 使用 svg 可改变颜色
文章目录vue3 vite 使用 svg安装插件2、配置插件 vite.config.js3、根据vite配置的svg图标文件夹,建好文件夹,把svg图标放入4、在 src/main.js内引入注册脚本5、创建一个公共SvgIcon.vue组件6.1 全局注册SvgIcon.vue组件6.2、在想要引入svg的vue组件中引…...
SQL82 返回 2020 年 1 月的所有订单的订单号和订单日期
描述Orders订单表order_numorder_datea00012020-01-01 00:00:00a00022020-01-02 00:00:00a00032020-01-01 12:00:00a00042020-02-01 00:00:00a00052020-03-01 00:00:00【问题】编写 SQL 语句,返回 2020 年 1 月的所有订单的订单号(order_num)…...
vulnhub zico2
总结:脏牛提权 目录 下载地址 漏洞分析 信息收集 木马上传 反弹shell 提权 下载地址 zico2.ova (Size: 828 MB)Download: https://www.dropbox.com/s/dhidaehguuhyv9a/zico2.ovaDownload (Mirror): https://download.vulnhub.com/zico/zico2.ova使用方法&…...
处理窗口的常用API函数及窗口处理经验总结(附源码)
目录 1、检测窗口状态 2、将窗口前置显示 2.1、将窗口拉到最前面显示 2.2、将窗口置顶显示 2.3、将窗口设置到指定窗口的上面 3、将不显示的窗口强行显示出来 4、获取窗口的信息 5、通过窗口信息去查找窗口 5.1、调用GetClassName接口去比对窗口的类名 5.2、调用Find…...
@TableId注解详细介绍
TableId注解是专门用在主键上的注解,如果数据库中的主键字段名和实体中的属性名,不一样且不是驼峰之类的对应关系,可以在实体中表示主键的属性上加Tableid注解,并指定Tableid注解的value属性值为表中主键的字段名既可以对应上。 …...
kubectl常用的命令
目录 安装 kubectl 一、命令自动补全 二、常用命令 1、查看所有pod列表 2、查看RC和service列表 3、显示Node的详细信息 4、显示Pod的详细信息, 特别是查看Pod无法创建的时候的日志 5、 根据yaml创建资源, apply可以重复执行,create不行 6、基于nginx.yaml…...
Linux 配置远程SSH服务(密码+密钥)
环境准备: 将虚拟机1恢复快照,然后手动配置一个NAT模式IP为192.168.200.100,hostname设置为fuwu1 将虚拟机1复制为虚拟机2,然后手动配置一个NAT模式IP为192.168.200.200,hostname设置为fuwu2 windows准备 xshell 或 pu…...
WuThreat身份安全云-TVD每日漏洞情报-2023-02-20
漏洞名称:Microsoft Exchange Server 远程执行代码漏洞 漏洞级别:高危 漏洞编号:CVE-2023-21529,CNNVD-202302-1075 相关涉及:Microsoft Exchange Server 2016 Cumulative Update 23 漏洞状态:POC 参考链接:https://tvd.wuthreat.com/#/listDetail?TVD_IDTVD-2023-03822 漏洞…...
面试经常被问悲观锁和乐观锁?什么是cas?来我花3分钟时间告诉你
锁大家都知道吧,多线程访问资源会存在竞争,那么就需要加锁进而让多个线程一个一个访问。 比如有一个房间,一次只能进一个人,现在有十个人都想进去怎么办? 对,加锁。拿一把钥匙,谁抢到钥匙谁就…...
React源码分析3-render阶段(穿插scheduler和reconciler)
本章将讲解 react 的核心阶段之一 —— render阶段,我们将探究以下部分内容的源码: 更新任务的触发更新任务的创建reconciler 过程同步和异步遍历及执行任务scheduler 是如何实现帧空闲时间调度任务以及中断任务的 触发更新 触发更新的方式主要有以下几…...
3功能测试心得分享
1. 登陆、添加、删除、查询模块是我们经常遇到的,这些模块的测试点该如何考虑 (1)登陆 ① 用户名和密码都符合要求(格式上的要求) ② 用户名和密码都不符合要求(格式上的要求) ③ 用户名符合要求,密码不符合要求(格式上的要求) ④ 密码符合要求ÿ…...
Python-推导式
Python 推导式 Python 推导式是一种独特的数据处理方式,可以从一个数据序列构建另一个新的数据序列的结构体。 Python 支持各种数据结构的推导式: 列表(list)推导式 字典(dict)推导式 集合(set)推导式 元组(tuple)推导式 列表推导式 列表推导式格式…...
操作系统线程
进程那一章,我们留下了一个问题 第一个cpu调用进程,进程调用i/o设备,主动进入ready 队列 第二个cpu将程序执行时间平均分时,进程执行时间到 第三个fork函数,我们上一章的lab有实践,可以看出是父进程主动条用…...
vue3中如何定义响应式变量
vue2中定义方式: 熟悉vue2的前端开发小伙伴,都知道定义变量的方式是属于 选项式写法,所有的变量名全都定义在 data(){return { title:‘hello world’}},里,如下图所示: <template><div><h1>{{tit…...
【C++修炼之路】20.手撕红黑树
每一个不曾起舞的日子都是对生命的辜负 红黑树实现:RBTree 前言一.红黑树的概念及性质1.1 红黑树的概念1.2 红黑树的性质二.红黑树的结构2.1 红黑树节点的定义2.2 红黑树类的封装三.红黑树的插入情况1:只变色情况2:变色单旋情况3:双旋插入的代…...
树状数组(高级数据结构)-蓝桥杯
一、简介树状数组 (Binary Indexed Tree,BIT),利用数的二进制特征进行检索的一种树状结构。一种真正的高级数据结构: 二分思想、二叉树、位运算、前缀和。高效!代码极其简洁!二、基本应用数列a1,a2,....,an,操作:单点修改…...
Flink-多流转换(Union、Connect、Join)
文章目录多流转换分流基本合流操作联合(Union)连接(Connect)基于时间的合流——双流联结(Join)窗口联结(Window Join)间隔联结(Interval Join)窗口同组联结&a…...
kubeadmin安装k8s集群
目录 一 、环境部署 1、服务器规划 2、环境准备 二、所有节点安装docker 1、配置yum源,安装docker 2、配置daemon.json文件 三、所有节点安装kubeadm、kubelet 和kubectl 四、部署k8s集群 1、查看初始化需要的镜像 2、导入镜像 3、初始化kubeadm 3.1 方…...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...
深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
在Ubuntu中设置开机自动运行(sudo)指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
用docker来安装部署freeswitch记录
今天刚才测试一个callcenter的项目,所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
