66 消息队列
66 消息队列
基础概念
参考资料:消息队列MQ快速入门(概念、RPC、MQ实质思路、队列介绍、队列对比、应用场景)
- 消息队列就是一个使用队列来通信的组件;
- 为什么需要消息队列?
在实际的商业项目中,它这么做肯定是有道理的。那么没有引入消息队列之前服务存在哪些问题呢?就拿支付服务来说,你提交了一个支付订单后,后台需要进行扣库存、扣款、短信通知等等,你需要等待后台把所有该做的做完了才能知道自己有没有购买成功,用户等等时间过长,后台请求链太多,很多业务不需要马上做完,比如短信通知等等,这些响应速度对于业务来说无关紧要,所以就提出了异步处理。
异步处理就是指我现在不做这个工作,我把这个工作丢给箱子里,有人会来这个箱子里找属于它的工作,我丢完我的工作就做完了,就可以给用户响应了,解耦,异步,提高性能。
应用在:服务解耦、流量控制,有好处也有坏处,坏处就是服务的稳定性降低,人多就不好控制,系统也一样。
消息队列具有两种模型:队列模型和发布/订阅模型。这两个模型简单的来说就是:队列模型即一条消息只能被一个消费者消费、发布订阅模型即一条消息可以被多个消费者消费。
其设计模式就是一发一存一消费,生产者——消费者模型。
五种队列
简单队列
一言以蔽之:简单队列——一个消息对应一个消费者
工作队列
一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!
竞争消费者模式。
这条消息具体会被哪个消费者消费事先并不知。
如何分发消息使之最大限度的发挥每一个消费者的效率——负载均衡。
发布/订阅模型
一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。
ps:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers,这里的交换器是 fanout。下面我们会详细介绍这几种交换器。
路由模式
生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。
也就是让消费者有选择性的接收消息。
主题模式
上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。
符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。
交换机
前面五种队列模式介绍完了,但是实际上只有三种,第一种简单队列,第二种工作模式,剩下的三种都是和交换器绑定的合起来称为一种,这小节我们就来详细介绍交换器。
交换器分为四种,分别是:direct、fanout、topic和 headers。
前面三种分别对应路由模式、发布订阅模式和通配符模式,headers 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和 direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器,这里也不详细介绍。
①、direct
如果路由键完全匹配的话,消息才会被投放到相应的队列。
②、fanout
当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。
③、topic
设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。
常用6种消息队列介绍和对比
RabbitMQ
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
ZeroMQ
号称史上最快的消息队列,它实际类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。
Kafka
特征
- 分布式消息发布订阅系统,其分区特性、可复制和可容错都是其不错的特性
- 快速持久化,可在O(1)的系统开销下进行消息持久化
- 高吞吐,在一台普通的服务器上就可以达到10W/s的吞吐率
- 完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式、自动实现负载均衡
- 支持同步和异步复制两种
- 支持数据批量发送和拉取
- zero-copy:减少IO操作步骤
- 数据迁移、扩容对用户透明
- 无需停机即可扩展机器
- 其他特性:严格的消息顺序、丰富的消息拉取机制、高效订阅者水平扩展、实时的消息订阅、亿级的消息堆积能力、定期删除机制
优点
- 客户端语言丰富
- 性能卓越、单机写入TPS约在百万级/秒,消息大小为10个字节
- 提供完全分布式架构,并有replica机制,拥有较高的可用性和可靠性,理论上支持消息无限堆积
- 支持批量操作
- 消费者采用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次
- 有优秀的第三方Kafka Web管理界面kafka-Manager
- 在日志领域比较成熟,被多家公司和多个开源项目使用
缺点
- Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
- 使用短轮询方式,实时性取决于轮询间隔时间
- 消费失败不支持重试
- 支持消息顺序,但是一台代理宕机后,就会产生消息乱序
RocketMQ
特征
- 具有高性能、高可靠、高实时、分布式特点
- Producer、Consumer、队列都可以分布式
- Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
- 较少的依赖
- 可以运行在Java语言所支持的平台之上
优点
- 单机支持1万以上持久化队列
- 所有消息都是持久化的,先写入系统PageCache,然后刷盘,可以保证内存和磁盘都有一份数据,访问时,直接从内存取
- 模型简单,接口易用
- 性能优越,可以大量堆积消息在broker中
- 支持多种消费,包括集群消费,广播消费等
- 各个环节分布式扩展设计
- 开发都较活跃,版本更新快
缺点
- 没有web管理界面,提供了一个CLI管理工具来查询、管理和诊断各种问题
- 没有在MQ核心去实现JMS等接口
选择
- ActiveMQ,最早的时候大家都用,但现在用的不是很多了,没经过大规模吞吐量场景的验证,社区不是很活跃,主流上不选择这个
- RabbitMQ较为成熟一些,在可用性、稳定性、可靠性上,RabbitMQ都要超过kafka,综合性能不错,但是erlang语言阻止了大量java工程师深入研究,且不支持事务,消息吞吐能力有限
- Kafka的性能是比RabbitMQ要更强的,RabbitMQ在有大量的消息堆积时,性能会下降,而Kafka不会,但是Kafka的设计初衷是处理日志的,可以看做一个日志系统,针对性非常强,没有具备一个成熟MQ应该具备的特性,它还是个孩子啊
- RocketMQ的思路起源于Kafka,但它对消息的可靠传输及事务性做了优化,适合一些大规模的分布式系统应用,但是生态不够成熟,会有黄掉的风险
- ZeroMQ只是一个网络编程的Pattern库,将常见的网络请求形式(分组管理、链接管理、发布订阅等)进行模式化、组件化。简单来说就是在socket之上、MQ之下。使用ZeroMQ的话,需要对自己的业务代码进行改造,不利于服务解耦
基于Netty实现huiMQ自定义消息队列
Netty服务端将会作为huiMQ,这里先搭一个Netty服务端与Netty客户端,具体的代码请参考:68 Netty
具体项目代码请查看:点击查看
生产者
发送消息队列
所有需要发送的消息都会被统一存在一个队列中,然后由一个线程来对这个队列中的消息发送到Netty服务端中。但是这里会存在消息生产者发送到消息队列时失败,从而导致消息丢失,所以为了保证消息不丢失,在该线程发送后,会把这条消息暂时存放在一个Map中,等到消息队列发送确认响应后才会把这条消息消除,如果超过2秒还未回复,就会把这个消息重新放回待发送消息队列中,同时把重传次数加一,如果重传次数大于5次,就会写入日志。
我们待发送消息队列选择BlockingQueue,该队列是线程安全的,阻塞队列(BlockingQueue) 是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
// 采用阻塞队列保证线程的安全 以便免多线程的情况下导致数据丢失 同时阻塞队列也可以当队列满的时候阻塞线程让其之后再重新添加运行private static final BlockingQueue<MessageBase.Message> blockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);// 等待确认的消息Mapprivate static final ConcurrentHashMap<String, MessageBase.Message> waitAckMessageMap = new ConcurrentHashMap<>();// 等待确认消息超时Mapprivate static final ConcurrentHashMap<String, LocalDateTime> messageTimeOutMap = new ConcurrentHashMap<>();// 超时private static final Long timeout = 2000L;// 最多超时重传次数private static final Integer maxRetries = 5;private static SocketChannel socketChannel;
发送消息线程
/**** @Description 构建一个线程来一直对队列中的消息发送到Netty服务端* @return {@link }* @Author yaoHui* @Date 2024/10/11*/private void sendMessageByThread(){new Thread(() -> {while(true){try{log.info("sendMessageByThread ready");MessageBase.Message message = blockingQueue.take();log.info("sendMessageByThread working");sendMessage(message);} catch (Exception e) {log.error("sendMessageByThread is interrupt 发送消息线程被中断");Thread.currentThread().interrupt();break;}}}).start();}/**** @Description 线程调用 让消息发送到Netty服务端的具体实现逻辑* @param message* @return {@link }* @Author yaoHui* @Date 2024/10/11*/private void sendMessage(MessageBase.Message message){// 超过最大重传次数if(message.getRetryCount() > maxRetries){log.error("消息传输失败次数超过5次:" + message.toString());}else{if (socketChannel.isActive()){socketChannel.writeAndFlush(message);waitAckMessageMap.put(message.getRequestId(),message);messageTimeOutMap.put(message.getRequestId(),LocalDateTime.now().plusSeconds(timeout));}else{log.info("Netty连接失败,请重试");addMessageBlockingQueue(message);}}}
确认消息定时任务
/**** @Description 将超时的消息重新加入队列中重新进行发送* @return {@link }* @Author yaoHui* @Date 2024/10/11*/private void messageRetryThread(){ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);Runnable task = () -> {log.info("messageRetryThread working");for (Map.Entry<String, MessageBase.Message> entry : waitAckMessageMap.entrySet()){MessageBase.Message message = entry.getValue();String key = entry.getKey();LocalDateTime time = messageTimeOutMap.get(key);if(time.isBefore(LocalDateTime.now())){MessageBase.Message newMessage = MessageBase.Message.newBuilder().setRequestId(message.getRequestId()).setCmd(message.getCmd()).setContent(message.getContent()).setRetryCount(message.getRetryCount()+1).setUrlPath(message.getUrlPath()).build();addMessageBlockingQueue(newMessage);messageTimeOutMap.remove(key);waitAckMessageMap.remove(key);}}};// 安排任务在延迟2秒后开始执行,之后每隔3秒执行一次scheduledExecutorService.scheduleAtFixedRate(task,2,3,TimeUnit.SECONDS);}
接收Netty服务端ACK消息
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof MessageBase.Message) {MessageBase.Message message = (MessageBase.Message) msg;if(message.getCmd() == MessageBase.Message.CommandType.ACK){SendMessageThread.getAckAndRemoveMessage(message.getRequestId());log.info("收到ACK:" + message.getRequestId());}else{System.out.println("Received response from server:");System.out.println("ID: " + message.getRequestId());System.out.println("Content: " + message.getContent());}} else {System.err.println("Received an unknown message type: " + msg.getClass().getName());}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客户端连接成功");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("客户端发生异常", cause);ctx.close();}
}
/**** @Description 根据RequestId来消除待确认中的消息* @param key* @return {@link }* @Author yaoHui* @Date 2024/10/11*/public static void getAckAndRemoveMessage(String key){messageTimeOutMap.remove(key);waitAckMessageMap.remove(key);}
全部代码
更详细的代码请查看:点击查看
NettyClient
@Component
@Slf4j
public class NettyClient {private static final EventLoopGroup group = new NioEventLoopGroup();private static final Integer port = 54021;private static final String host = "localhost";private static SocketChannel socketChannel;private static SendMessageThread sendMessageThread = new SendMessageThread();/**** @Description 添加消息到阻塞队列中 为消息生产者调用* @param message* @return {@link }* @Author yaoHui* @Date 2024/10/11*/public void addMessageBlockingQueue(MessageBase.Message message){if(!socketChannel.isActive()){this.start();}sendMessageThread.addMessageBlockingQueue(message);}/**** @Description 该方法提供获取SocketChannel 暂时无用* @return {@link SocketChannel }* @Author yaoHui* @Date 2024/10/11*/public static SocketChannel getSocketChannel() throws Exception {if (!socketChannel.isActive()) {return socketChannel;
// socketChannel = socketChannel1;}return socketChannel;}/**** @Description 连接断开重新连接* @return {@link SocketChannel }* @Author yaoHui* @Date 2024/10/11*/private static SocketChannel retryConnect(){Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).remoteAddress(host, port).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()// 空闲检测.addLast(new IdleStateHandler(60, 60, 60)) // 60秒写空闲,30秒读空闲.addLast(new HeartbeatHandler()).addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance())).addLast(new ProtobufVarint32LengthFieldPrepender()).addLast(new ProtobufEncoder()).addLast(new NettyClientHandler()); // 自定义处理器}});ChannelFuture future = bootstrap.connect();if (future.isSuccess()){return (SocketChannel) future.channel();}else{return null;}}/**** @Description Netty客户端启动函数 调用Start可以启动对Netty服务端的连接* @return {@link }* @Author yaoHui* @Date 2024/10/11*/@PostConstructprivate void start(){Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).remoteAddress(host, port).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()// 空闲检测.addLast(new IdleStateHandler(60, 60, 60)) // 60秒写空闲,30秒读空闲.addLast(new HeartbeatHandler()).addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance())).addLast(new ProtobufVarint32LengthFieldPrepender()).addLast(new ProtobufEncoder()).addLast(new NettyClientHandler()); // 自定义处理器}});ChannelFuture future = bootstrap.connect();//客户端断线重连逻辑future.addListener((ChannelFutureListener) future1 -> {if (future1.isSuccess()) {log.info("连接Netty服务端成功");} else {log.info("连接失败,进行断线重连");future1.channel().eventLoop().schedule(this::start, 10, TimeUnit.SECONDS);}});socketChannel = (SocketChannel) future.channel();sendMessageThread.setSocketChannel(socketChannel);}}
SendMessageThread
package com.fang.screw.client.Thread;import com.fang.screw.client.component.NettyClient;
import com.fang.screw.client.protocol.MessageBase;
import io.netty.channel.socket.SocketChannel;
import lombok.extern.slf4j.Slf4j;import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;/*** @FileName SendMessageThread* @Description* @Author yaoHui* @date 2024-10-11**/
@Slf4j
public class SendMessageThread {// 采用阻塞队列保证线程的安全 以便免多线程的情况下导致数据丢失 同时阻塞队列也可以当队列满的时候阻塞线程让其之后再重新添加运行private static final BlockingQueue<MessageBase.Message> blockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);// 等待确认的消息Mapprivate static final ConcurrentHashMap<String, MessageBase.Message> waitAckMessageMap = new ConcurrentHashMap<>();// 等待确认消息超时Mapprivate static final ConcurrentHashMap<String, LocalDateTime> messageTimeOutMap = new ConcurrentHashMap<>();// 超时private static final Long timeout = 2000L;// 最多超时重传次数private static final Integer maxRetries = 5;private static SocketChannel socketChannel;public SendMessageThread(){sendMessageByThread();messageRetryThread();}public SendMessageThread(SocketChannel socketChannel1){socketChannel = socketChannel1;}public void setSocketChannel(SocketChannel socketChannel1){socketChannel = socketChannel1;}public void addMessageBlockingQueue(MessageBase.Message message){blockingQueue.add(message);}/**** @Description 构建一个线程来一直对队列中的消息发送到Netty服务端* @return {@link }* @Author yaoHui* @Date 2024/10/11*/private void sendMessageByThread(){new Thread(() -> {while(true){try{log.info("sendMessageByThread ready");MessageBase.Message message = blockingQueue.take();log.info("sendMessageByThread working");sendMessage(message);} catch (Exception e) {log.error("sendMessageByThread is interrupt 发送消息线程被中断");Thread.currentThread().interrupt();break;}}}).start();}/**** @Description 线程调用 让消息发送到Netty服务端的具体实现逻辑* @param message* @return {@link }* @Author yaoHui* @Date 2024/10/11*/private void sendMessage(MessageBase.Message message){// 超过最大重传次数if(message.getRetryCount() > maxRetries){log.error("消息传输失败次数超过5次:" + message.toString());}else{if (socketChannel.isActive()){socketChannel.writeAndFlush(message);}else{log.info("Netty连接失败,请重试");
// socketChannel = NettyClient.getSocketChannel();addMessageBlockingQueue(message);}}}/**** @Description 将超时的消息重新加入队列中重新进行发送* @return {@link }* @Author yaoHui* @Date 2024/10/11*/private void messageRetryThread(){ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);Runnable task = () -> {log.info("messageRetryThread working");for (Map.Entry<String, MessageBase.Message> entry : waitAckMessageMap.entrySet()){MessageBase.Message message = entry.getValue();String key = entry.getKey();LocalDateTime time = messageTimeOutMap.get(key);if(time.isBefore(LocalDateTime.now())){MessageBase.Message newMessage = MessageBase.Message.newBuilder().setRequestId(message.getRequestId()).setCmd(message.getCmd()).setContent(message.getContent()).setRetryCount(message.getRetryCount()+1).setUrlPath(message.getUrlPath()).build();addMessageBlockingQueue(newMessage);messageTimeOutMap.remove(key);waitAckMessageMap.remove(key);}}};// 安排任务在延迟2秒后开始执行,之后每隔3秒执行一次scheduledExecutorService.scheduleAtFixedRate(task,2,3,TimeUnit.SECONDS);}/**** @Description 根据RequestId来消除待确认中的消息* @param key* @return {@link }* @Author yaoHui* @Date 2024/10/11*/public static void getAckAndRemoveMessage(String key){messageTimeOutMap.remove(key);waitAckMessageMap.remove(key);}}
huiMQ
保存消息
/**** @Description 保存消息到消息队列中 并且保存到MySQL数据库持久化 注意这里是一个事务 要么都成功要么都别成功* @param message* @return {@link boolean }* @Author yaoHui* @Date 2024/10/12*/@Transactional(rollbackFor = Exception.class)public boolean saveMessage(MessageBase.Message message){try{// 保存到消息队列中if(!messageQueueMap.containsKey(message.getChannel())){BlockingQueue<MessageBase.Message> messageBlockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);messageQueueMap.put(message.getChannel(),messageBlockingQueue);}messageQueueMap.get(message.getChannel()).add(message);// 保存到MySQL数据库中if(!messageQueueMapper.exists(Wrappers.<MessageQueuePO>lambdaQuery().eq(MessageQueuePO::getRequestId,message.getRequestId()).eq(MessageQueuePO::getDelFlag,0))){MessageQueuePO messageQueuePO = new MessageQueuePO();messageQueuePO.setRequestId(message.getRequestId());messageQueuePO.setCmd(message.getCmdValue());messageQueuePO.setContent(message.getContent());messageQueuePO.setUrlPath(message.getUrlPath());messageQueueMapper.insert(messageQueuePO);log.info("HuiMQ保存消息:" + messageQueuePO.toString());}}catch (Exception e){log.info("保存消息至消息队列错误!");e.printStackTrace();return false;}return true;}
收到消费端发来请求消息
// 消费端请求发送消息// 检查是否有超时的消息 如果有则将其重新放置于待重传的消息队列中HuiMessageQueue.checkTimeOutMessage();if(HuiMessageQueue.messageQueueMap.containsKey(message.getChannel())){BlockingQueue<MessageBase.Message> queue = HuiMessageQueue.messageQueueMap.get(message.getChannel());while(!queue.isEmpty()){MessageBase.Message sendMessage = queue.take();log.info("HuiMQ向消费者发送消息:" + sendMessage.toString());ctx.writeAndFlush(sendMessage);// 将消费列为带确认消息HuiMessageQueue.waitAckMessageMap.put(sendMessage.getRequestId(),sendMessage);HuiMessageQueue.messageTimeOutMap.put(sendMessage.getRequestId(), LocalDateTime.now().plusSeconds(2L));}}/**** @Description 检查是否有超时没有收到确认消息的消息 将其重新放置在待发送消息队列中* @return {@link }* @Author yaoHui* @Date 2024/10/13*/public static void checkTimeOutMessage(){for(Map.Entry<String,LocalDateTime> mapEntry : messageTimeOutMap.entrySet()){LocalDateTime time = mapEntry.getValue();if(time.isBefore(LocalDateTime.now())){String s = mapEntry.getKey();MessageBase.Message message = waitAckMessageMap.get(s);waitAckMessageMap.remove(s);messageTimeOutMap.remove(s);// 保存到消息队列中if(!messageQueueMap.containsKey(message.getChannel())){BlockingQueue<MessageBase.Message> messageBlockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);messageQueueMap.put(message.getChannel(),messageBlockingQueue);}messageQueueMap.get(message.getChannel()).add(message);}}}
收到消费端ACK消息
log.info("收到消费端发来的ACK报文:" + message.getRequestId());
HuiMessageQueue.setMessageAck(message.getRequestId());/**** @Description* @param requestId* @return {@link }* @Author yaoHui* @Date 2024/10/13*/public static void setMessageAck(String requestId){messageTimeOutMap.remove(requestId);waitAckMessageMap.remove(requestId);}
消费者
标记需要监听消息方法
/*** @FileName HuiListener* @Description HuiMQ监听注解* @Author yaoHui* @date 2024-10-12**/
@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface HuiListener {String queueName();
}
获取所有被HuiListener注解标记的方法
通过BeanPostProcessor来获取所有被指定注解标记的方法,BeanPostProcessor会在每个Bean初始化前后调用,分别为postProcessBeforeInitialization和postProcessAfterInitialization。
这里会将所有被HuiListener标记的方法和Bean注册到huiListenerRegistry中,为了方便之后通过反射的方式来直接运行指定方法。
/*** @FileName HuiListenerAnnotationBeanPostProcessor* @Description* @Author yaoHui* @date 2024-10-12**/
@Component
public class HuiListenerAnnotationBeanPostProcessor implements BeanPostProcessor, InitializingBean {private static final HuiListenerRegistry huiListenerRegistry = new HuiListenerRegistry();public static boolean huiListenerFlag = false;/**** @Description 在 bean 的初始化方法(如 @PostConstruct 注解的方法或 init-method 指定的方法)之前调用。* @param bean* @param beanName* @return {@link Object }* @Author yaoHui* @Date 2024/10/12*/@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {return BeanPostProcessor.super.postProcessBeforeInitialization(bean, beanName);}/**** @Description 在 bean 的初始化方法之后调用。查看当前的bean是否存在被HuiListener注解过的方法* @param bean* @param beanName* @return {@link Object }* @Author yaoHui* @Date 2024/10/12*/@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Method[] methods = bean.getClass().getMethods();for(Method method : methods){if(method.isAnnotationPresent(HuiListener.class)){processHuiListener(method,bean);huiListenerFlag = true;}}return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);}private void processHuiListener(Method method,Object bean){HuiListener huiListener = method.getAnnotation(HuiListener.class);HuiListenerEndpoint huiListenerEndpoint = new HuiListenerEndpoint();huiListenerEndpoint.setBean(bean);huiListenerEndpoint.setMethod(method);huiListenerRegistry.registerListenerEndpoint(huiListener.queueName(),huiListenerEndpoint);}@Overridepublic void afterPropertiesSet() throws Exception {}
}
/*** @FileName HuiListenerEndpoint* @Description* @Author yaoHui* @date 2024-10-12**/
@Data
public class HuiListenerEndpoint {private Method method;private Object bean;
}
/*** @FileName HuiListenerRegistry* @Description* @Author yaoHui* @date 2024-10-12**/
@Slf4j
public class HuiListenerRegistry {public static final ConcurrentHashMap<String,HuiListenerEndpoint> huiListenerEndpointConcurrentHashMap = new ConcurrentHashMap<>();/**** @Description 添加HuiListener监听的方法* @param queueName* @param huiListenerEndpoint* @return {@link }* @Author yaoHui* @Date 2024/10/12*/public void registerListenerEndpoint(String queueName,HuiListenerEndpoint huiListenerEndpoint){huiListenerEndpointConcurrentHashMap.put(queueName,huiListenerEndpoint);}/**** @Description 消费者处理收到消息的主要逻辑* @param message* @return {@link boolean }* @Author yaoHui* @Date 2024/10/12*/public boolean handleMessage(MessageBase.Message message){HuiListenerEndpoint huiListenerEndpoint = huiListenerEndpointConcurrentHashMap.get(message.getChannel());if(ObjectUtils.isEmpty(huiListenerEndpoint)){log.info("消息无对应Channel消费:" + message.getChannel());return true;}Method method = huiListenerEndpoint.getMethod();Object bean = huiListenerEndpoint.getBean();try{Class<?>[] classes = method.getParameterTypes();method.invoke(bean,JSON.parseObject(message.getContent(),classes[0]));}catch (Exception e){log.info("消息消费异常");e.printStackTrace();return false;}return true;}
}
定期发送请求消息
/*** @FileName getMessageThread* @Description* @Author yaoHui* @date 2024-10-12**/
@Slf4j
public class GetMessageThread {private NettyClient nettyClient;// private static HuiListenerRegistry huiListenerRegistry = new HuiListenerRegistry();public GetMessageThread(NettyClient nettyClient){this.nettyClient = nettyClient;regularGetMessage();}/**** @Description 定时发送是否存在消息* @return {@link }* @Author yaoHui* @Date 2024/10/12*/private void regularGetMessage(){ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);Runnable task = () -> {
// log.info("regularGetMessage is running");Set<String> channel = HuiListenerRegistry.huiListenerEndpointConcurrentHashMap.keySet();for(String s : channel){MessageBase.Message message = MessageBase.Message.newBuilder().setCmd(MessageBase.Message.CommandType.SEND_MESSAGE).setChannel(s).build();nettyClient.sendMessage(message);}};scheduledExecutorService.scheduleAtFixedRate(task,2,3, TimeUnit.SECONDS);}}
消费端启动定期请求消息
如果这个服务中有被HuiListener注解标记的方法就会启用这个方法,SmartInitializingSingleton会在所有的Bean初始化之后运行。
/*** @FileName MyBeanInjector* @Description* @Author yaoHui* @date 2024-10-12**/
@Component
@Slf4j
public class MyBeanInjector implements SmartInitializingSingleton {private final NettyClient nettyClient;public MyBeanInjector(NettyClient nettyClient){this.nettyClient = nettyClient;}@Overridepublic void afterSingletonsInstantiated() {log.info("GetMessageThread is ready");if (HuiListenerAnnotationBeanPostProcessor.huiListenerFlag){GetMessageThread getMessageThread = new GetMessageThread(nettyClient);log.info("GetMessageThread is running");}}
}
使用HuiMQ
/*** @FileName ReceiveHuiMessage* @Description* @Author yaoHui* @date 2024-10-12**/
@Component
@Slf4j
public class ReceiveHuiMessage {@HuiListener(queueName = "queue")public void noticeUserHaveComment(CommentVO commentVO){log.info("Chat模块接收到消息:" + commentVO.toString());}}
进阶
如何保证消息不丢失
对于生产者来说需要做到失败重传,采用确认机制可以有效避免消息丢失;
对于Broker来说,需要控制给生产者确认的时机,在Broker保存消息到MySQL后再进行返回,可以有效避免消息丢失;
对于消费者来说,需要在消息真正消费之后再给Broker进行确认,可以避免消息丢失;
如何保证消息不会被重复消费
对于正常的业务消息在消息队列中是不可避免会存在重复消费问题,所以我们只能在业务层面进行消除该影响。
这种问题就是典型的幂等性问题,即对于同样的一种操作所带来的结果是一致的,比如这种update t1 set money = 150 where id = 1 and money = 100;
执行多少遍money
都是150,这就叫幂等。
所以我们一般的解决方法是添加版本号version,在执行SQL时判断version是否一致,不一致则不执行。或者记录关键的key,像订单号这种,执行过的就不需要再执行。
如何保证消息的有序性
如何处理消息堆积
消息队列根本原因是消费者消费跟不上生产者,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。
假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加Topic
的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者。
相关文章:

66 消息队列
66 消息队列 基础概念 参考资料:消息队列MQ快速入门(概念、RPC、MQ实质思路、队列介绍、队列对比、应用场景) 消息队列就是一个使用队列来通信的组件;为什么需要消息队列? 在实际的商业项目中,它这么做肯…...
【系统分析师】-案例篇-信息系统安全
1、信息系统的安全威胁 来自于物理环境、通信链路、网络系统、操作系统、应用系统以及管理等多个方面。 物理安全威胁是指对系统所用设备的威胁,如自然灾害、电源故障、数据库故障和设备被盗等造成数据丢失或信息泄漏。 通信链路安全威胁是指在传输线路上安装窃…...

基于极光优化算法(Polar Lights Optimization, PLO)的多无人机协同三维路径规划(提供MATLAB代码)
一、极光优化算法介绍 极光优化算法(Polar Lights Optimization, PLO)是2024年提出的一种新型的元启发式优化算法,它从极光这一自然现象中汲取灵感。极光是由太阳风中的带电粒子在地球磁场的作用下,与地球大气层中的气体分子碰撞…...
TypeScript类型体操5
类型编程主要的目的就是对类型做各种转换,如何对类型做修改? TypeScript 类型系统支持 3 种可以声明任意类型的变量: type、infer、类型参数。 type:类型别名,声明一个变量存储某个类型。type t Promise<number&g…...
搭建广告展示页Start
想自定义广告- 场景: app冷启动/热启动-有广告需求,就打开广告页,没有的话就去登录或者主页 有的app有的需要广告页,有的不需要,搞个配置呗!!! 通过首选项配置存储我们的一些常用…...
无极低码基础版(部署版)课程计划
基础版(部署版)使用指南 特点 简单:1分钟学会无需编码:会SQL即可适合人群:纯小白0代码写服务1. 本地环境安装 JDKMySQLRedisTomcat2. 环境变量配置 JDK无极低码授权3. 配置文件修改 4. 服务启动 5. 服务发布示例 服务手动注册SQL语句注册6. 新增接口示例 正常新增非空参…...
Word文档功能快捷键大全
以下是 Microsoft Word 的全面快捷键大全,涵盖了文档操作、文本编辑、格式化、导航等多种功能,帮助你提高工作效率。 Word 全面快捷键和快捷方式表 功能类别快捷键/快捷方式功能描述基本文档操作Ctrl N新建文档Ctrl O打开文档Ctrl S保存文档F12另存…...

题目:1297. 子串的最大出现次数
> Problem: 1297. 子串的最大出现次数 题目:1297. 子串的最大出现次数 题目描述 给定一个字符串 s,要求找到满足以下条件的任意子串的出现次数,并返回该子串的最大出现次数: 子串中不同字母的数目必须小于等于 maxLetters。…...
一力破万法,高并发系统优化通解思路
高并发系统优化:从理论到Java实践 针对高并发场景,以下策略能够有效提升系统的稳定性和响应速度: 加集群 结果:通过增加服务器数量,实现负载均衡,提高系统整体处理能力。过程: 配置负载均衡器&…...
P8635 [蓝桥杯 2016 省 AB] 四平方和
对于一个给定的正整数,可能存在多种平方和的表示法。 要求你对 44个数排序使得 0≤a≤b≤c≤d。 输入 #1复制 5 输出 #1 0 0 1 2 输入 #2 12 输出 #2 0 2 2 2 输入 #3 773535 输出 #3 1 1 267 838 代码 #include<bits/stdc.h> using namespace …...

ElasticSearch是什么?
1.概述 Elasticsearch 是一个基于 Apache Lucene 构建的开源分布式搜索引擎和分析引擎。它专为云计算环境设计,提供了一个分布式的、高可用的实时分析和搜索平台。Elasticsearch 可以处理大量数据,并且具备横向扩展能力,能够通过增加更多的硬…...

2024年四非边缘鼠鼠计算机保研回忆(记录版 碎碎念)
Hi,大家好,我是半亩花海。写下这篇博客时已然是金秋十月,心中的石头终于落地,恍惚间百感交集。对于保研这条路,我处于摸着石头过河、冲击、随缘的这些状态。计算机保研向来比其他专业难,今年形势更是艰难。…...
clickhouse常用脚本语句
1.创建库和删除库 drop database IF EXISTS rt_db CREATE DATABASE rt_db ENGINE = Ordinary; CREATE DATABASE rt_db ENGINE = Atomic;2.创建表 CREATE TABLE IF NOT EXISTS intellect_alarm_info ( `id` UInt64 , `client_info_id...

GeneMark软件的秘钥gm_key失效怎么办?
GeneMark软件的gm_key失效怎么办 1. 下载网址(为软件的下载界面):http://topaz.gatech.edu/GeneMark/license_download.cgi 2.下载界面 根据自己的需求下载对应的版本和类型的gm_key秘钥 3.填写注册信息 4. 点击下载软件和密钥 5. 将秘钥…...

线性回归逻辑回归-笔记
一、线性回归(Linear Regression) 1. 定义 线性回归是一种用于回归问题的算法,旨在找到输入特征与输出值之间的线性关系。它试图通过拟合一条直线来最小化预测值与真实值之间的误差。 2. 模型表示 线性回归模型假设目标变量(输…...

如何将数据从 AWS S3 导入到 Elastic Cloud - 第 1 部分:Elastic Serverless Forwarder
作者:来自 Elastic Hemendra Singh Lodhi 这是多部分博客系列的第一部分,探讨了将数据从 AWS S3 导入 Elastic Cloud 的不同选项。 Elasticsearch 提供了多种从 AWS S3 存储桶导入数据的选项,允许客户根据其特定需求和架构策略选择最合适的方…...
Linux基础-正则表达式
正则表达式概述 正则表达式是处理字符串的一种工具,可以用于查找、删除、替换特定的字符串,主要用于文件内容的处理。与之不同的是,通配符则用于文件名称的匹配。正则表达式通过使用特殊符号,帮助用户轻松实现对文本的操作。 一…...

【HTML格式PPT离线到本地浏览】
文章目录 概要实现细节小结 概要 最近在上课时总是出现网络不稳定导致的PPT无法浏览的情况出现,就想到下载到电脑上。但是PPT是一个HTML的网页,无法通过保存网页(右键另存为mhtml只能保存当前页)的形式全部下载下来,试…...
如何在Vue项目中封装axios
文章目录 一、axios简介基本使用 二、封装axios的原因三、封装axios的方法1. 设置接口请求前缀2. 设置请求头和超时时间3. 封装请求方法4. 添加请求拦截器5. 添加响应拦截器小结 一、axios简介 axios 是一个基于 XMLHttpRequest 的轻量级HTTP客户端,适用于浏览器和…...

linux 配置ssh免密登录
一、 cd /root/.ssh/ #不存在就创建mkdir /root/.ssh ssh-keygen #连续按4个回车 ll二、将公钥发送到目标服务器下 #公钥上传到目标服务器 ssh-copy-id root192.168.31.142 #回车完也是要输入密码的 #测试一下免密登录: ssh root192.168.31.142 成功...
VTK如何让部分单位不可见
最近遇到一个需求,需要让一个vtkDataSet中的部分单元不可见,查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行,是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示,主要是最后一个参数,透明度…...

ArcGIS Pro制作水平横向图例+多级标注
今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作:ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等(ArcGIS出图图例8大技巧),那这次我们看看ArcGIS Pro如何更加快捷的操作。…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

视觉slam十四讲实践部分记录——ch2、ch3
ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

【VLNs篇】07:NavRL—在动态环境中学习安全飞行
项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲
文章目录 前言第一部分:体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分:体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...
多模态图像修复系统:基于深度学习的图片修复实现
多模态图像修复系统:基于深度学习的图片修复实现 1. 系统概述 本系统使用多模态大模型(Stable Diffusion Inpainting)实现图像修复功能,结合文本描述和图片输入,对指定区域进行内容修复。系统包含完整的数据处理、模型训练、推理部署流程。 import torch import numpy …...

HubSpot推出与ChatGPT的深度集成引发兴奋与担忧
上周三,HubSpot宣布已构建与ChatGPT的深度集成,这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋,但同时也存在一些关于数据安全的担忧。 许多网络声音声称,这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制
目录 节点的功能承载层(GATT/Adv)局限性: 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能,如 Configuration …...