当前位置: 首页 > news >正文

从 0 到 1 ,手把手教你编写《消息队列》项目(Java实现) —— 创建虚拟机

文章目录

  • 一、虚拟机
  • 二、关于消息的API
    • 发布消息
      • 直接交换机 DIRECT 转发规则
      • 扇出交换机 FANOUT 转发规则
      • 主题交换机 TOPIC 转发规则
      • 匹配规则
      • Router类
    • 订阅消息
      • 消费者
      • 队列如何给订阅的消费者发送消息
      • 自动发送消息至订阅者
    • 应答消息
  • 三、代码编写


一、虚拟机

接下来要创建虚拟机,每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.同时提供 api 供上层调用.

在这里咱们实现的单虚拟机,并没有提供创建虚拟机,销毁虚拟机的功能,但是为了方便后续的扩展,咱们要约定好如何区分多个虚拟机之间的交换机,队列,绑定关系.

不同虚拟机当然可以拥有相同名字的交换机等.
比如
虚拟机A中 拥有 交换机C,
虚拟机B中 拥有 交换机C,
咱们视为以上情况是合法的.

咱们这里采取的方案是,在客户提供的交换机等的身份标识(交换机名字),前加上虚拟机的名字.
即 客户要在虚拟机 VirtualHostA 中创建交换机 exchangeC,咱们服务器存储的交换机名字是 VirtualHostAexchangeC.

当然也有其他方案,大家可以自由发挥.

二、关于消息的API

这是虚拟机要提供给上层的API
在这里插入图片描述

前6个API咱们已经写好了,只需要直接调用下层的API即可.
咱们现在来考虑后 7 - 9 这三个API的实现.

发布消息

发布消息API:其实就是生产者将消息发送给对应的交换机,交换机再根据不同的转发规则,转发给与之相绑定且符合规则的消息队列.

绑定关系 Binding 中有一个 bindingKey 属性
消息 Message 中 有一个 routingKey 属性

下面就来讲解一下三种交换机的转发规则已经这两个 Key 的不同含义.

直接交换机 DIRECT 转发规则

在直接交换机中,
bindingKye是无意义的,
routingKey是要转发到的队列的队列名.

直接交换机的转发规则, 是无视 bindingKey的,即 直接交换机是否与这个队列绑定都没有关系,而直接将消息转发到 routingKey指定的队列名的队列中.


扇出交换机 FANOUT 转发规则

在扇出交换机中,
bindingKye是无意义的,
routingKey是无意义的.

扇出交换机的转发规则,是将收到的消息转发到与之绑定的所有队列中.与bindingKye和routingKey是没有任何关系的.


主题交换机 TOPIC 转发规则

在主题交换机中,
bindingKey是创建绑定时,给绑定指定的特殊字符串(相当于一把锁),
routingKey是转发消息时,给消息指定的特殊字符串(相当于一把钥匙).

主题交换机的转发规则,是将收到的消息的routingKey与绑定的所有队列中的 bindingKey 进行匹配,当且仅当匹配成功时,才将消息转发给该队列.

匹配规则

routingKey规则

  • 由数字,字母,下划线组成
  • 使用 . 将routingKey分成多个部分.

bindingKey规则

  • 由数字,字母,下划线组成
  • 使用 . 将routingKey分成多个部分.
  • 支持两种特殊的符号作为通配符 * 与 # (*和#必须是作为被 . 分割出来的单独部分如 aaa*.bb就是非法的)
    * 可以匹配任何一个独立的部分
    # 可以匹配0个或多个的独立部分

匹配规则
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

Router类

在core包中创建 Router类,来完成对bindingKey与routingKey的校验与匹配.

/*** 这个类用来检查 bindingKey与routingKey 是否合法* 以及 bindingKey与routingKey 的匹配功能,* 以及 根据不同交换机的转发规则判断 消息 Message 是否可以转发给对应的绑定队列*/
public class Router {// bindingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.// 检查 BindingKey 是否合法public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为约定的).// 这样约定是因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性 提升不大~~// 1. aaa.#.#.bbb    => 非法// 2. aaa.#.*.bbb    => 非法// 3. aaa.*.#.bbb    => 非法// 4. aaa.*.*.bbb    => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}// 检查 RoutingKey 是否合法// routingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分public boolean checkRoutingKey(String routingKey) {if (routingKey.length() == 0) {// 空字符串. 合法的情况. 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 ""return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);// 判定该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}// 判定该字母是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}// 判定该字母是否是阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}// 判定是否是 _ 或者 .if (ch == '_' || ch == '.') {continue;}// 该字符, 不是上述任何一种合法情况, 就直接返回 falsereturn false;}// 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 truereturn true;}// 用来判断该消息是否可以转发给这个绑定对应的队列public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException {// 根据不同的 exchangeType 使用不同的转发规则if (exchangeType == ExchangeType.FANOUT) {// 如果是 FANOUT 类型,则该交换机上绑定的所有队列都需要转发return true;} else if (exchangeType == ExchangeType.TOPIC) {return routeTopic(binding,message);} else {throw new MqException("[Router] 交换机类型非法! exchangeType="+exchangeType);}}// 用来匹配 bindingKey与routingKeyprivate boolean routeTopic(Binding binding, Message message) {// 先把这两个 key 进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 引入两个下标, 指向上述两个数组. 初始情况下都为 0int bindingIndex = 0;int routingIndex = 0;// 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 forwhile (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {if (bindingTokens[bindingIndex].equals("*")) {// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!bindingIndex++;routingIndex++;continue;} else if (bindingTokens[bindingIndex].equals("#")) {// 如果遇到 #, 需要先看看有没有下一个位置.bindingIndex++;if (bindingIndex == bindingTokens.length) {// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!return true;}// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex == -1) {// 没找到匹配的结果. 匹配失败return false;}// 找到的匹配的情况, 继续往后匹配.bindingIndex++;routingIndex++;} else {// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex++;routingIndex++;}}// [情况五] 判定是否是双方同时到达末尾// 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失败的.if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if (routingTokens[i].equals(bindingToken)) {return i;}}return -1;}
}

订阅消息

消费者

咱们要创建一个消费者类,其中有消费者的信息,以及该消费者订阅的队列的名字,
以及消息应答模式,以及回调函数.

回调函数是什么?
是让消费者自己设定一个函数,当有新的消息转发给该消费者后,就执行这个回调函数

在这里插入图片描述

/*** 消费者的回调函数*/
@FunctionalInterface
public interface Consumer {// Delivery 是 “投递” 的意思,这个方法预期是在每次服务器收到消息之后,来调用// 通过这个方法把消息推送给对应的消费者// 这里的方法名与参数列表都是参考 RabbitMQ 的void handleDelivery(String consumerTag, BasicProperties basicProperties,byte[] body) throws MqException, IOException;
}
/*** 表示一个消费者(完整的执行环境)*/
@Data
public class ConsumerEnv {// 消费者信息private String consumerTag;// 订阅队列的名字private String queueName;// true -> 自动应答, false -> 手动应答private boolean autoAck;// 回调函数private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}
}

队列如何给订阅的消费者发送消息

这里咱们要想清楚的是,一个队列可以有多个消费者,
新来的消息要转发给哪个消费者呢?
咱们在这里采取轮询策略,即让消费者排队,依次将消息发送给消费者,当消费者收到消息后,则移动到队伍的最后等待下个消息.

因此咱们要给核心类 Message类再增加几个属性和方法,来管理消费者,

/*** 表示一个存储消息的队列* MSG =》Message* 消息队列的使用者是消费者*/
@Data
public class MSGQueue {// 表示队列的身份标识private String name;// 表示队列是否持久化private boolean durable = false;// true -> 这个队列只能被一个消费者使用,false -> 大家都能使用这个队列// 后续代码不实现相关功能private boolean exclusive = false;// true -> 没人使用后,自动删除,false -> 没人使用,不自动删除private boolean autoDelete = false;// 表示扩展参数,后续代码没有实现private Map<String,Object> arguments = new HashMap<>();// 当前队列有哪些消费者订阅了private List<ConsumerEnv> consumerEnvList = new ArrayList<>();// 记录当前取到了第几个消费者,方便实现轮询策略private AtomicInteger consumerSeq = new AtomicInteger(0);// 添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}// 删除订阅者暂不考虑// 挑选一个订阅者,用来处理当前的消息(按照轮询的方式)public ConsumerEnv chooseConsumer() {// 无人订阅if (consumerEnvList.size() == 0) {return null;}int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}public String getArguments() {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}public void setArguments(String arguments) {ObjectMapper objectMapper = new ObjectMapper();try {this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public void setArguments(Map<String,Object> arguments) {this.arguments = arguments;}public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key,Object value) {arguments.put(key, value);}
}

自动发送消息至订阅者

那么消费者要如何拿到消息呢?即如何将消息发送给消费者,咱们这里采取的是自动发送,即队列中来了新消息,就自动将新消息发送给订阅了这个队列的消费者.

咱们实现的方法是,使用一个阻塞队列,当生产者发布消息到交换机时,交换机转发消息到对应的队列后,就把队列名当作令牌添加到这个阻塞队列中,再配置一个扫描线程,去时刻扫描这个阻塞队列中是否有新的令牌了,有了新令牌,则根据令牌去对应的队列中,去把新消息安装轮询策略转发给消费者.


应答消息

应答消息共有两种模式.

  • 自动应答:将消息发送给消费者就算应答了(不关心消费者收没收到,相当于没应答)
  • 手动应答:需要消费者手动调用应答方法(确保消费者收到消息了)

三、代码编写

/*** 通过这个类, 来表示 虚拟主机.* 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.* 同时提供 api 供上层调用.* 针对 VirtualHost 这个类, 作为业务逻辑的整合者, 就需要对于代码中抛出的异常进行处理了.*/@Data
public class VirtualHost {private String virtualHostName;private Router router = new Router();private DiskDataCenter diskDataCenter = new DiskDataCenter();private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();// 操作交换机的锁对象,防止多线程操作交换机时,出现线程安全问题,如 创建了两个拥有相同身份标识的交换机private final Object exchangeLocker = new Object();// 操作队列的锁对象,防止多线程,操作队列时,出现线程安全问题,如 创建了两个拥有相同身份标识的队列private final Object queueLocker = new Object();// 消费者管理中心ConsumerManager consumerManager = new ConsumerManager(this);public VirtualHost(String virtualHostName) {this.virtualHostName = virtualHostName;// MemoryDataCenter 并不需要初始化,当 new MemoryDataCenter();时,里面所需的数据结构也都已经创建好了// DiskDataCenter 就需要初始化操作,去建库建表建文件和初始化数据的设定diskDataCenter.init();try {// 将硬盘中已有的数据恢复到内存中memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();}}// 创建交换机// 如果交换机不存在, 就创建. 如果存在, 直接返回.// 返回值是 boolean. 创建成功, 返回 true. 失败返回 falsepublic boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String,Object> arguments) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 把交换机的名字, 加上虚拟主机作为前缀.// 1.判断交换机是否存在,直接从内存查询if (memoryDataCenter.getExchange(exchangeName) != null) {// 该交换机已经存在!System.out.println("[VirtualHost] 交换机已经存在! exchangeName=" + exchangeName);return true;}// 2.创建交换机Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3.把交换机对象写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}// 4.把交换机对象写入内存memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建成功! exchangeName=" + exchangeName);// 上述逻辑, 先写硬盘, 后写内存. 目的就是因为硬盘更容易写失败. 如果硬盘写失败了, 内存就不写了.// 要是先写内存, 内存写成功了, 硬盘写失败了, 还需要把内存的数据给再删掉. 就比较麻烦了.}return true;} catch (Exception e) {System.out.println("[VirtualHost] 交换机创建失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}// 删除交换机public boolean exchangeDelete(String exchangeName) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 1.判断交换机是否存在Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if (existsExchange == null) {throw new MqException("[virtualHostName] 交换机不存在无法删除!");}if (existsExchange.isDurable()) {// 2.删除硬盘上的数据diskDataCenter.deleteExchange(exchangeName);}// 3.删除内存上的数据memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}// 创建队列public boolean queueuDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments) {// 给队列名字加前缀queueName = virtualHostName + queueName;try {synchronized (queueLocker) {// 1.判断队列是否存在if (memoryDataCenter.getQueue(queueName) != null) {System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName);return true;}// 2.创建队列MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3.向硬盘中写入数据if (durable) {diskDataCenter.insertQueue(queue);}// 4.向内存中写入数据memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName);e.printStackTrace();return false;}}// 删除队列public boolean queueDelete(String queueeName) {queueeName = virtualHostName + queueeName;try {synchronized (queueLocker) {MSGQueue existsQueue = memoryDataCenter.getQueue(queueeName);// 1.判断交换机是否存在if (existsQueue == null) {throw new MqException("[virtualHostName] 队列不存在无法删除!");}// 2.删除硬盘上的数据if (existsQueue.isDurable()) {diskDataCenter.deleteQueue(queueeName);}// 3.删除内存上的数据memoryDataCenter.deleteQueue(queueeName);System.out.println("[VirtualHost] 队列删除成功! queueeName=" + queueeName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 队列删除失败! queueeName=" + queueeName);e.printStackTrace();return false;}}// 创建绑定public boolean bindingDeclare(String exchangeName,String queueName,String bindingKey) {exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在,对应的交换机 队列是否存在,bindingKey是否合法if (memoryDataCenter.getBinding(exchangeName, queueName) != null) {return true;}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}if (!router.checkBindingKey(bindingKey)) {throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);}// 2.创建绑定Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 3.写入硬盘if (exchange.isDurable() && queue.isDurable()) {diskDataCenter.insertBinding(binding);}// 4.写入内存memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName+ ", queueName=" + queueName);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName+ ", queueName=" + queueName);e.printStackTrace();return false;}}// 删除绑定public boolean bindingDelete(String exchangeName,String queueName) {exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在,对应的交换机 队列是否存在,bindingKey是否合法Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if (binding == null) {throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);}// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.diskDataCenter.deleteBinding(binding);// 3. 删除内存的数据memoryDataCenter.deleteBinding(binding);System.out.println("[VirtualHost] 删除绑定成功!");}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 删除绑定失败!");e.printStackTrace();return false;}}// 发送消息到指定的交换机/队列中public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {exchangeName = virtualHostName + exchangeName;// 1.检查 routingKey是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);}// 2.查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 3.判断交换机绑定类型if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.// 4.查找对应的队列String queueName = virtualHostName + routingKey;MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}// 5.构造 messageMessage message = Message.createMessageWithId(routingKey, basicProperties, body);// 6.发送消息sendMessage(queue,message);return true;} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBinding(exchangeName);for (Map.Entry<String,Binding> entry : bindingsMap.entrySet()) {// 6.获取绑定对象,判断对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null){// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());continue;}// 7.构造消息Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 8.判断该消息是否可以发送给该队列if (router.route(exchange.getType(),binding,message)) {sendMessage(queue,message);}}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败!");e.printStackTrace();return false;}}// 发送消息到硬盘与内存private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息,即 把消息写入到 硬盘 和 内存 中int deliverMode = message.getDeliverMode();// deliverMode 为 1 不持久化,为 2 持久化if (deliverMode == 2) {diskDataCenter.sendMessage(queue,message);}// 写入内存memoryDataCenter.sendMessage(queue,message);// 通知消费者可以消费消息了consumerManager.notifyConsume(queue.getName());}// 订阅消息// 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后,应答的方式  为 true 自动应答,为 false 手动应答// consumer: 是一个回调函数,此处类型设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda样子public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[VirtualHost] basicConsume成功 queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume失败 queueName=" + queueName);e.printStackTrace();return false;}}// 消息的手动应答public boolean basicAck(String queueName,String messageId) {queueName = virtualHostName + queueName;try {// 1.获取消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);}// 2.删除硬盘上的消息if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}// 3.删除消息中心的消息memoryDataCenter.removeMessage(messageId);// 4.删除待确认消息中的消息memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName + ", messageId="+ messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败! 消息被成功确认! queueName=" + queueName + ", messageId="+ messageId);e.printStackTrace();return false;}}}

相关文章:

从 0 到 1 ,手把手教你编写《消息队列》项目(Java实现) —— 创建虚拟机

文章目录 一、虚拟机二、关于消息的API发布消息直接交换机 DIRECT 转发规则扇出交换机 FANOUT 转发规则主题交换机 TOPIC 转发规则匹配规则Router类 订阅消息消费者队列如何给订阅的消费者发送消息自动发送消息至订阅者 应答消息 三、代码编写 一、虚拟机 接下来要创建虚拟机,…...

GIT版本控制--前言

欢迎来到《GIT版本控制》专栏&#xff01;在当今软件开发和协作的世界中&#xff0c;版本控制是不可或缺的工具之一。无论您是一名初学者&#xff0c;一位经验丰富的开发者&#xff0c;还是一个项目团队的成员&#xff0c;都有可能会受益于对GIT的深入了解。 GIT是一个强大的分…...

由于 MAC 地址的问题,导致网络不通的原因和分析

由于 MAC 地址的问题&#xff0c;导致网络不通的原因和分析 将现象及原因分析发给大家&#xff0c;供大家参考&#xff0c;以后有类似问题时有个解决问题的参考开发板网络不通&#xff0c;也抓不到包&#xff0c;折腾了好久&#xff0c;将电脑和开发板用网线直连&#xff0c;结…...

游戏开发中的设计模式

单例模式 实例化单一对象&#xff0c;懒加载 //单例模式 class GameManagerSingleton {private constructor(){}private static instance:GameManagerSingleton;public static Instance(){if(!GameManagerSingleton.instance){this.instance new GameManagerSingleton();}re…...

React核心原理与实际开发

学习目标 React是啥&#xff1f; 官方定义&#xff1a;将前端请求获取到的数据渲染为HTML视图的JavaScript库。 一、React入门 1、React项目创建 直接创建react&#xff0c;使用初始化会创建package.json npm init -y再安装 2、React基本使用 使用纯JS创建ReactDOM&#…...

Springboot+vue的企业OA管理系统(有报告),Javaee项目,springboot vue前后端分离项目。

演示视频&#xff1a; Springbootvue的企业OA管理系统&#xff08;有报告&#xff09;&#xff0c;Javaee项目&#xff0c;springboot vue前后端分离项目。 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的企业OA管理系统&#xff0c;采用M&#xff08;m…...

3、字符设备驱动框架和开发步骤

一、Linux内核对文件的分类 Linux的文件种类 1、-&#xff1a;普通文件2、d&#xff1a;目录文件3、p&#xff1a;管道文件4、s&#xff1a;本地socket文件5、l&#xff1a;链接文件6、c&#xff1a;字符设备7、b&#xff1a;块设备 Linux内核按驱动程序实现模型框架的不同&…...

[MySQL]基础篇

文章目录 1. MySQL基本使用1.1 MySQL的启动和登录1.1.1 MySQL的启动1.1.2 MySQL的客户端连接 1.2 数据模型 2. SQL2.1 SQL类型2.1.1 数值类型2.1.2 字符串类型2.1.3 日期类型 2.2 DDL2.2.1 数据库操作2.2.2 表操作 - 查询2.2.3 表操作 - 创建表2.2.4 表操作 - 修改 2.3 DML2.3.…...

Meta Semantic Template for Evaluation of Large Language Models

本文是LLM系列文章&#xff0c;针对《Meta Semantic Template for Evaluation of Large Language Models》的翻译。 大型语言模型评估的元语义模板 摘要1 引言2 相关工作3 方法4 实验5 结论 摘要 大型语言模型(llm)是否真正理解语言的语义&#xff0c;或者只是记住训练数据?…...

Git相关知识(1)

目录 1.初识Git 1.基础知识 2.centos中下载 2.基本操作 1.创建本地仓库 2.配置本地仓库 3.版本库、工作区、暂存区 4.添加文件 5.add和commit对git文件的作用 6.修改文件 7.版本回退 8.撤销修改 9.删除文件 3.分支操作 1.HEAD与分支 2.创建分支 3.删除分支 …...

pytorch中nn.DataParallel多次使用

pytorch中nn.DataParallel多次使用 import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader# 定义模型 class MyModel(nn.Module):def __init__(self):super(MyModel, self).__init__()self.fc nn.Linear(10, 1)def forwa…...

制作电商页面(Html)

任务 制作一个电商页面&#xff0c;要求所卖物品清晰&#xff0c;页面色调清晰&#xff0c;要有主页和详情页。 网站所买物品&#xff1a;书籍 色调&#xff1a;#FF2400 橙红色 代码 主页HTML代码&#xff1a; <html><head><meta charset"utf-8"…...

Android Sutdio依赖Snapshot版本,无法同步最新的包

起因 局域网中搭建了Nexus托管本地打包的aar&#xff0c;正常情况下&#xff0c;把修改完成的库推送到仓库后&#xff0c;其他项目引用Snapshot版本的依赖&#xff0c;同步后会马上下载最新的包&#xff0c;但是当第二次推送后&#xff0c;就没有重新下载最新的包&#xff0c;…...

Feign调用异常触发降级捕获异常

通过配置fallbackFactory来捕获异常信息&#xff0c;代码如下 FeignClient(name "user", fallbackFactory UserFallBackFactory.class) public interface UserFeign {PostMapping("/get/list")Map getList();}Component public class UserFallBackFacto…...

Springboot 音乐网站管理系统idea开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 springboot 音乐网站管理系统是一套完善的信息系统&#xff0c;结合springboot框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用springboot框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统 具有完整的源代码和数据库&…...

微信支付v2-02

...

企业的销售活动是什么?CRM销售管理系统给你答案

在企业业务中&#xff0c;销售活动是实现企业业绩目标的基本单元&#xff0c;起着奠基石的作用。CRM销售管理系统是销售活动管理的必备工具&#xff0c;帮助企业更好地开展销售活动。下面来说说企业的销售活动是什么&#xff1f; 什么是销售活动 简单来说&#xff0c;销售人员…...

【PG】PostgreSQL参数格式 配置文件格式

目录 一 PG参数格式 二 通过配置文件修改参数 postgresql.auto.conf文件 三 通过SQL修改参数 四 通过shell修改参数 五 管理配置文件内容 一 PG参数格式 所有参数名都是大小写不敏感的。每个参数都可以接受五种类型之一的值&#xff1a; 布尔、字符串、整数、 浮点数或枚…...

应用层协议 HTTP

一、应用层协议 我们已经学过 TCP/IP , 已然知道数据能从客户端进程经过路径选择跨网络传送到服务器端进程。 我们还需要知道的是&#xff0c;我们把数据从 A 端传送到 B 端&#xff0c; TCP/IP 解决的是顺丰的功能&#xff0c;而两端还要对数据进行加工处理或者使用&#xf…...

Springboot+vue的应急救援物资管理系统,Javaee项目,springboot vue前后端分离项目。

演示视频&#xff1a; Springbootvue的应急救援物资管理系统&#xff0c;Javaee项目&#xff0c;springboot vue前后端分离项目。 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的应急救援物资管理系统&#xff0c;采用M&#xff08;model&#xff09;V&…...

蓝桥杯c++新手如何起步?快马生成带详解的入门代码示例

作为一名刚接触蓝桥杯C竞赛的新手&#xff0c;最头疼的往往不是算法本身&#xff0c;而是连基础语法都还没摸透就要面对复杂题目。最近在准备比赛时&#xff0c;我发现用传统方式学习效率很低——手动敲完代码后&#xff0c;经常因为不熟悉语法细节卡壳&#xff0c;调试半天也找…...

AIGlasses_for_navigation视频分割教程:上传→处理→下载→验证全流程详解

AIGlasses_for_navigation视频分割教程&#xff1a;上传→处理→下载→验证全流程详解 你是不是遇到过这样的场景&#xff1a;手里有一段视频&#xff0c;想快速找出里面的特定物体&#xff0c;比如盲道、斑马线&#xff0c;或者红绿灯&#xff1f;手动一帧一帧看&#xff0c;…...

利用快马平台十分钟搭建yolo目标检测web演示原型

最近在尝试用YOLO算法做目标检测的Web演示&#xff0c;发现用InsCode(快马)平台可以超级快地搭建出原型。整个过程比我预想的简单太多&#xff0c;从零开始到实际运行只用了十分钟左右&#xff0c;特别适合想快速验证想法的时候用。这里记录下我的实现思路和具体步骤&#xff0…...

简单三步:用Qwen3语义雷达,为你的网站添加智能搜索功能

简单三步&#xff1a;用Qwen3语义雷达&#xff0c;为你的网站添加智能搜索功能 1. 为什么需要语义搜索&#xff1f; 传统的网站搜索功能大多基于关键词匹配&#xff0c;这种技术存在明显局限。当用户搜索"如何解决电脑卡顿"时&#xff0c;如果知识库中只有"提…...

为什么92%的FastAPI AI服务仍在用阻塞式响应?(深度剖析async def vs sync def在LLM流式场景下的内存泄漏与协程死锁)

第一章&#xff1a;FastAPI 2.0异步AI流式响应的核心价值与演进脉络在大模型服务规模化部署的背景下&#xff0c;传统同步HTTP响应已难以满足低延迟、高吞吐、用户体验敏感的AI交互场景。FastAPI 2.0通过深度整合Python 3.11原生异步运行时、优化ASGI中间件栈及重构StreamingRe…...

突破4大硬件限制:老旧Windows设备升级Windows 11的3维优化方案

突破4大硬件限制&#xff1a;老旧Windows设备升级Windows 11的3维优化方案 【免费下载链接】OpenCore-Legacy-Patcher 体验与之前一样的macOS 项目地址: https://gitcode.com/GitHub_Trending/op/OpenCore-Legacy-Patcher 老旧设备升级Windows 11的价值解析 在数字化快…...

FLUX.1文生图+SDXL风格保姆级教程:5分钟搞定AI绘画,新手也能出大片

FLUX.1文生图SDXL风格保姆级教程&#xff1a;5分钟搞定AI绘画&#xff0c;新手也能出大片 1. 为什么选择这个组合&#xff1f; FLUX.1-dev-fp8-dit与SDXL Prompt Styler的组合&#xff0c;是目前AI绘画领域最易上手且效果惊艳的解决方案之一。这个组合最大的特点是&#xff1…...

Screencast-Keys故障速查:按键显示功能的3大场景化一站式实战解决方案

Screencast-Keys故障速查&#xff1a;按键显示功能的3大场景化一站式实战解决方案 【免费下载链接】Screencast-Keys Blender Add-on: Screencast Keys 项目地址: https://gitcode.com/gh_mirrors/sc/Screencast-Keys Screencast-Keys是Blender的一款实用插件&#xff0…...

Qwen2.5-0.5B Instruct在软件测试中的自动化应用

Qwen2.5-0.5B Instruct在软件测试中的自动化应用 1. 引言 软件测试是确保产品质量的关键环节&#xff0c;但传统测试方法往往耗时费力。开发人员需要编写大量测试用例&#xff0c;执行重复的测试流程&#xff0c;还要分析复杂的测试结果。这个过程不仅枯燥&#xff0c;还容易…...

Youtu-Parsing工业文档解析:设备说明书表格+示意图+技术参数提取

Youtu-Parsing工业文档解析&#xff1a;设备说明书表格示意图技术参数提取 1. 引言&#xff1a;当工业文档遇上智能解析 想象一下这个场景&#xff1a;你是一家设备制造公司的技术工程师&#xff0c;手头有一份50页的设备说明书PDF&#xff0c;里面密密麻麻全是技术参数表格、…...