当前位置: 首页 > news >正文

从 0 到 1 ,手把手教你编写《消息队列》项目(Java实现) —— 创建虚拟机

文章目录

  • 一、虚拟机
  • 二、关于消息的API
    • 发布消息
      • 直接交换机 DIRECT 转发规则
      • 扇出交换机 FANOUT 转发规则
      • 主题交换机 TOPIC 转发规则
      • 匹配规则
      • Router类
    • 订阅消息
      • 消费者
      • 队列如何给订阅的消费者发送消息
      • 自动发送消息至订阅者
    • 应答消息
  • 三、代码编写


一、虚拟机

接下来要创建虚拟机,每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.同时提供 api 供上层调用.

在这里咱们实现的单虚拟机,并没有提供创建虚拟机,销毁虚拟机的功能,但是为了方便后续的扩展,咱们要约定好如何区分多个虚拟机之间的交换机,队列,绑定关系.

不同虚拟机当然可以拥有相同名字的交换机等.
比如
虚拟机A中 拥有 交换机C,
虚拟机B中 拥有 交换机C,
咱们视为以上情况是合法的.

咱们这里采取的方案是,在客户提供的交换机等的身份标识(交换机名字),前加上虚拟机的名字.
即 客户要在虚拟机 VirtualHostA 中创建交换机 exchangeC,咱们服务器存储的交换机名字是 VirtualHostAexchangeC.

当然也有其他方案,大家可以自由发挥.

二、关于消息的API

这是虚拟机要提供给上层的API
在这里插入图片描述

前6个API咱们已经写好了,只需要直接调用下层的API即可.
咱们现在来考虑后 7 - 9 这三个API的实现.

发布消息

发布消息API:其实就是生产者将消息发送给对应的交换机,交换机再根据不同的转发规则,转发给与之相绑定且符合规则的消息队列.

绑定关系 Binding 中有一个 bindingKey 属性
消息 Message 中 有一个 routingKey 属性

下面就来讲解一下三种交换机的转发规则已经这两个 Key 的不同含义.

直接交换机 DIRECT 转发规则

在直接交换机中,
bindingKye是无意义的,
routingKey是要转发到的队列的队列名.

直接交换机的转发规则, 是无视 bindingKey的,即 直接交换机是否与这个队列绑定都没有关系,而直接将消息转发到 routingKey指定的队列名的队列中.


扇出交换机 FANOUT 转发规则

在扇出交换机中,
bindingKye是无意义的,
routingKey是无意义的.

扇出交换机的转发规则,是将收到的消息转发到与之绑定的所有队列中.与bindingKye和routingKey是没有任何关系的.


主题交换机 TOPIC 转发规则

在主题交换机中,
bindingKey是创建绑定时,给绑定指定的特殊字符串(相当于一把锁),
routingKey是转发消息时,给消息指定的特殊字符串(相当于一把钥匙).

主题交换机的转发规则,是将收到的消息的routingKey与绑定的所有队列中的 bindingKey 进行匹配,当且仅当匹配成功时,才将消息转发给该队列.

匹配规则

routingKey规则

  • 由数字,字母,下划线组成
  • 使用 . 将routingKey分成多个部分.

bindingKey规则

  • 由数字,字母,下划线组成
  • 使用 . 将routingKey分成多个部分.
  • 支持两种特殊的符号作为通配符 * 与 # (*和#必须是作为被 . 分割出来的单独部分如 aaa*.bb就是非法的)
    * 可以匹配任何一个独立的部分
    # 可以匹配0个或多个的独立部分

匹配规则
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

Router类

在core包中创建 Router类,来完成对bindingKey与routingKey的校验与匹配.

/*** 这个类用来检查 bindingKey与routingKey 是否合法* 以及 bindingKey与routingKey 的匹配功能,* 以及 根据不同交换机的转发规则判断 消息 Message 是否可以转发给对应的绑定队列*/
public class Router {// bindingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.// 检查 BindingKey 是否合法public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为约定的).// 这样约定是因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性 提升不大~~// 1. aaa.#.#.bbb    => 非法// 2. aaa.#.*.bbb    => 非法// 3. aaa.*.#.bbb    => 非法// 4. aaa.*.*.bbb    => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}// 检查 RoutingKey 是否合法// routingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分public boolean checkRoutingKey(String routingKey) {if (routingKey.length() == 0) {// 空字符串. 合法的情况. 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 ""return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);// 判定该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}// 判定该字母是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}// 判定该字母是否是阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}// 判定是否是 _ 或者 .if (ch == '_' || ch == '.') {continue;}// 该字符, 不是上述任何一种合法情况, 就直接返回 falsereturn false;}// 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 truereturn true;}// 用来判断该消息是否可以转发给这个绑定对应的队列public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException {// 根据不同的 exchangeType 使用不同的转发规则if (exchangeType == ExchangeType.FANOUT) {// 如果是 FANOUT 类型,则该交换机上绑定的所有队列都需要转发return true;} else if (exchangeType == ExchangeType.TOPIC) {return routeTopic(binding,message);} else {throw new MqException("[Router] 交换机类型非法! exchangeType="+exchangeType);}}// 用来匹配 bindingKey与routingKeyprivate boolean routeTopic(Binding binding, Message message) {// 先把这两个 key 进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 引入两个下标, 指向上述两个数组. 初始情况下都为 0int bindingIndex = 0;int routingIndex = 0;// 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 forwhile (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {if (bindingTokens[bindingIndex].equals("*")) {// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!bindingIndex++;routingIndex++;continue;} else if (bindingTokens[bindingIndex].equals("#")) {// 如果遇到 #, 需要先看看有没有下一个位置.bindingIndex++;if (bindingIndex == bindingTokens.length) {// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!return true;}// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex == -1) {// 没找到匹配的结果. 匹配失败return false;}// 找到的匹配的情况, 继续往后匹配.bindingIndex++;routingIndex++;} else {// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex++;routingIndex++;}}// [情况五] 判定是否是双方同时到达末尾// 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失败的.if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if (routingTokens[i].equals(bindingToken)) {return i;}}return -1;}
}

订阅消息

消费者

咱们要创建一个消费者类,其中有消费者的信息,以及该消费者订阅的队列的名字,
以及消息应答模式,以及回调函数.

回调函数是什么?
是让消费者自己设定一个函数,当有新的消息转发给该消费者后,就执行这个回调函数

在这里插入图片描述

/*** 消费者的回调函数*/
@FunctionalInterface
public interface Consumer {// Delivery 是 “投递” 的意思,这个方法预期是在每次服务器收到消息之后,来调用// 通过这个方法把消息推送给对应的消费者// 这里的方法名与参数列表都是参考 RabbitMQ 的void handleDelivery(String consumerTag, BasicProperties basicProperties,byte[] body) throws MqException, IOException;
}
/*** 表示一个消费者(完整的执行环境)*/
@Data
public class ConsumerEnv {// 消费者信息private String consumerTag;// 订阅队列的名字private String queueName;// true -> 自动应答, false -> 手动应答private boolean autoAck;// 回调函数private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}
}

队列如何给订阅的消费者发送消息

这里咱们要想清楚的是,一个队列可以有多个消费者,
新来的消息要转发给哪个消费者呢?
咱们在这里采取轮询策略,即让消费者排队,依次将消息发送给消费者,当消费者收到消息后,则移动到队伍的最后等待下个消息.

因此咱们要给核心类 Message类再增加几个属性和方法,来管理消费者,

/*** 表示一个存储消息的队列* MSG =》Message* 消息队列的使用者是消费者*/
@Data
public class MSGQueue {// 表示队列的身份标识private String name;// 表示队列是否持久化private boolean durable = false;// true -> 这个队列只能被一个消费者使用,false -> 大家都能使用这个队列// 后续代码不实现相关功能private boolean exclusive = false;// true -> 没人使用后,自动删除,false -> 没人使用,不自动删除private boolean autoDelete = false;// 表示扩展参数,后续代码没有实现private Map<String,Object> arguments = new HashMap<>();// 当前队列有哪些消费者订阅了private List<ConsumerEnv> consumerEnvList = new ArrayList<>();// 记录当前取到了第几个消费者,方便实现轮询策略private AtomicInteger consumerSeq = new AtomicInteger(0);// 添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}// 删除订阅者暂不考虑// 挑选一个订阅者,用来处理当前的消息(按照轮询的方式)public ConsumerEnv chooseConsumer() {// 无人订阅if (consumerEnvList.size() == 0) {return null;}int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}public String getArguments() {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}public void setArguments(String arguments) {ObjectMapper objectMapper = new ObjectMapper();try {this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public void setArguments(Map<String,Object> arguments) {this.arguments = arguments;}public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key,Object value) {arguments.put(key, value);}
}

自动发送消息至订阅者

那么消费者要如何拿到消息呢?即如何将消息发送给消费者,咱们这里采取的是自动发送,即队列中来了新消息,就自动将新消息发送给订阅了这个队列的消费者.

咱们实现的方法是,使用一个阻塞队列,当生产者发布消息到交换机时,交换机转发消息到对应的队列后,就把队列名当作令牌添加到这个阻塞队列中,再配置一个扫描线程,去时刻扫描这个阻塞队列中是否有新的令牌了,有了新令牌,则根据令牌去对应的队列中,去把新消息安装轮询策略转发给消费者.


应答消息

应答消息共有两种模式.

  • 自动应答:将消息发送给消费者就算应答了(不关心消费者收没收到,相当于没应答)
  • 手动应答:需要消费者手动调用应答方法(确保消费者收到消息了)

三、代码编写

/*** 通过这个类, 来表示 虚拟主机.* 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.* 同时提供 api 供上层调用.* 针对 VirtualHost 这个类, 作为业务逻辑的整合者, 就需要对于代码中抛出的异常进行处理了.*/@Data
public class VirtualHost {private String virtualHostName;private Router router = new Router();private DiskDataCenter diskDataCenter = new DiskDataCenter();private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();// 操作交换机的锁对象,防止多线程操作交换机时,出现线程安全问题,如 创建了两个拥有相同身份标识的交换机private final Object exchangeLocker = new Object();// 操作队列的锁对象,防止多线程,操作队列时,出现线程安全问题,如 创建了两个拥有相同身份标识的队列private final Object queueLocker = new Object();// 消费者管理中心ConsumerManager consumerManager = new ConsumerManager(this);public VirtualHost(String virtualHostName) {this.virtualHostName = virtualHostName;// MemoryDataCenter 并不需要初始化,当 new MemoryDataCenter();时,里面所需的数据结构也都已经创建好了// DiskDataCenter 就需要初始化操作,去建库建表建文件和初始化数据的设定diskDataCenter.init();try {// 将硬盘中已有的数据恢复到内存中memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();}}// 创建交换机// 如果交换机不存在, 就创建. 如果存在, 直接返回.// 返回值是 boolean. 创建成功, 返回 true. 失败返回 falsepublic boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String,Object> arguments) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 把交换机的名字, 加上虚拟主机作为前缀.// 1.判断交换机是否存在,直接从内存查询if (memoryDataCenter.getExchange(exchangeName) != null) {// 该交换机已经存在!System.out.println("[VirtualHost] 交换机已经存在! exchangeName=" + exchangeName);return true;}// 2.创建交换机Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3.把交换机对象写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}// 4.把交换机对象写入内存memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建成功! exchangeName=" + exchangeName);// 上述逻辑, 先写硬盘, 后写内存. 目的就是因为硬盘更容易写失败. 如果硬盘写失败了, 内存就不写了.// 要是先写内存, 内存写成功了, 硬盘写失败了, 还需要把内存的数据给再删掉. 就比较麻烦了.}return true;} catch (Exception e) {System.out.println("[VirtualHost] 交换机创建失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}// 删除交换机public boolean exchangeDelete(String exchangeName) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 1.判断交换机是否存在Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if (existsExchange == null) {throw new MqException("[virtualHostName] 交换机不存在无法删除!");}if (existsExchange.isDurable()) {// 2.删除硬盘上的数据diskDataCenter.deleteExchange(exchangeName);}// 3.删除内存上的数据memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}// 创建队列public boolean queueuDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments) {// 给队列名字加前缀queueName = virtualHostName + queueName;try {synchronized (queueLocker) {// 1.判断队列是否存在if (memoryDataCenter.getQueue(queueName) != null) {System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName);return true;}// 2.创建队列MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3.向硬盘中写入数据if (durable) {diskDataCenter.insertQueue(queue);}// 4.向内存中写入数据memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName);e.printStackTrace();return false;}}// 删除队列public boolean queueDelete(String queueeName) {queueeName = virtualHostName + queueeName;try {synchronized (queueLocker) {MSGQueue existsQueue = memoryDataCenter.getQueue(queueeName);// 1.判断交换机是否存在if (existsQueue == null) {throw new MqException("[virtualHostName] 队列不存在无法删除!");}// 2.删除硬盘上的数据if (existsQueue.isDurable()) {diskDataCenter.deleteQueue(queueeName);}// 3.删除内存上的数据memoryDataCenter.deleteQueue(queueeName);System.out.println("[VirtualHost] 队列删除成功! queueeName=" + queueeName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 队列删除失败! queueeName=" + queueeName);e.printStackTrace();return false;}}// 创建绑定public boolean bindingDeclare(String exchangeName,String queueName,String bindingKey) {exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在,对应的交换机 队列是否存在,bindingKey是否合法if (memoryDataCenter.getBinding(exchangeName, queueName) != null) {return true;}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}if (!router.checkBindingKey(bindingKey)) {throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);}// 2.创建绑定Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 3.写入硬盘if (exchange.isDurable() && queue.isDurable()) {diskDataCenter.insertBinding(binding);}// 4.写入内存memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName+ ", queueName=" + queueName);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName+ ", queueName=" + queueName);e.printStackTrace();return false;}}// 删除绑定public boolean bindingDelete(String exchangeName,String queueName) {exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在,对应的交换机 队列是否存在,bindingKey是否合法Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if (binding == null) {throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);}// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.diskDataCenter.deleteBinding(binding);// 3. 删除内存的数据memoryDataCenter.deleteBinding(binding);System.out.println("[VirtualHost] 删除绑定成功!");}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 删除绑定失败!");e.printStackTrace();return false;}}// 发送消息到指定的交换机/队列中public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {exchangeName = virtualHostName + exchangeName;// 1.检查 routingKey是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);}// 2.查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 3.判断交换机绑定类型if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.// 4.查找对应的队列String queueName = virtualHostName + routingKey;MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}// 5.构造 messageMessage message = Message.createMessageWithId(routingKey, basicProperties, body);// 6.发送消息sendMessage(queue,message);return true;} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBinding(exchangeName);for (Map.Entry<String,Binding> entry : bindingsMap.entrySet()) {// 6.获取绑定对象,判断对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null){// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());continue;}// 7.构造消息Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 8.判断该消息是否可以发送给该队列if (router.route(exchange.getType(),binding,message)) {sendMessage(queue,message);}}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败!");e.printStackTrace();return false;}}// 发送消息到硬盘与内存private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息,即 把消息写入到 硬盘 和 内存 中int deliverMode = message.getDeliverMode();// deliverMode 为 1 不持久化,为 2 持久化if (deliverMode == 2) {diskDataCenter.sendMessage(queue,message);}// 写入内存memoryDataCenter.sendMessage(queue,message);// 通知消费者可以消费消息了consumerManager.notifyConsume(queue.getName());}// 订阅消息// 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后,应答的方式  为 true 自动应答,为 false 手动应答// consumer: 是一个回调函数,此处类型设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda样子public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[VirtualHost] basicConsume成功 queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume失败 queueName=" + queueName);e.printStackTrace();return false;}}// 消息的手动应答public boolean basicAck(String queueName,String messageId) {queueName = virtualHostName + queueName;try {// 1.获取消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);}// 2.删除硬盘上的消息if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}// 3.删除消息中心的消息memoryDataCenter.removeMessage(messageId);// 4.删除待确认消息中的消息memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName + ", messageId="+ messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败! 消息被成功确认! queueName=" + queueName + ", messageId="+ messageId);e.printStackTrace();return false;}}}

相关文章:

从 0 到 1 ,手把手教你编写《消息队列》项目(Java实现) —— 创建虚拟机

文章目录 一、虚拟机二、关于消息的API发布消息直接交换机 DIRECT 转发规则扇出交换机 FANOUT 转发规则主题交换机 TOPIC 转发规则匹配规则Router类 订阅消息消费者队列如何给订阅的消费者发送消息自动发送消息至订阅者 应答消息 三、代码编写 一、虚拟机 接下来要创建虚拟机,…...

GIT版本控制--前言

欢迎来到《GIT版本控制》专栏&#xff01;在当今软件开发和协作的世界中&#xff0c;版本控制是不可或缺的工具之一。无论您是一名初学者&#xff0c;一位经验丰富的开发者&#xff0c;还是一个项目团队的成员&#xff0c;都有可能会受益于对GIT的深入了解。 GIT是一个强大的分…...

由于 MAC 地址的问题,导致网络不通的原因和分析

由于 MAC 地址的问题&#xff0c;导致网络不通的原因和分析 将现象及原因分析发给大家&#xff0c;供大家参考&#xff0c;以后有类似问题时有个解决问题的参考开发板网络不通&#xff0c;也抓不到包&#xff0c;折腾了好久&#xff0c;将电脑和开发板用网线直连&#xff0c;结…...

游戏开发中的设计模式

单例模式 实例化单一对象&#xff0c;懒加载 //单例模式 class GameManagerSingleton {private constructor(){}private static instance:GameManagerSingleton;public static Instance(){if(!GameManagerSingleton.instance){this.instance new GameManagerSingleton();}re…...

React核心原理与实际开发

学习目标 React是啥&#xff1f; 官方定义&#xff1a;将前端请求获取到的数据渲染为HTML视图的JavaScript库。 一、React入门 1、React项目创建 直接创建react&#xff0c;使用初始化会创建package.json npm init -y再安装 2、React基本使用 使用纯JS创建ReactDOM&#…...

Springboot+vue的企业OA管理系统(有报告),Javaee项目,springboot vue前后端分离项目。

演示视频&#xff1a; Springbootvue的企业OA管理系统&#xff08;有报告&#xff09;&#xff0c;Javaee项目&#xff0c;springboot vue前后端分离项目。 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的企业OA管理系统&#xff0c;采用M&#xff08;m…...

3、字符设备驱动框架和开发步骤

一、Linux内核对文件的分类 Linux的文件种类 1、-&#xff1a;普通文件2、d&#xff1a;目录文件3、p&#xff1a;管道文件4、s&#xff1a;本地socket文件5、l&#xff1a;链接文件6、c&#xff1a;字符设备7、b&#xff1a;块设备 Linux内核按驱动程序实现模型框架的不同&…...

[MySQL]基础篇

文章目录 1. MySQL基本使用1.1 MySQL的启动和登录1.1.1 MySQL的启动1.1.2 MySQL的客户端连接 1.2 数据模型 2. SQL2.1 SQL类型2.1.1 数值类型2.1.2 字符串类型2.1.3 日期类型 2.2 DDL2.2.1 数据库操作2.2.2 表操作 - 查询2.2.3 表操作 - 创建表2.2.4 表操作 - 修改 2.3 DML2.3.…...

Meta Semantic Template for Evaluation of Large Language Models

本文是LLM系列文章&#xff0c;针对《Meta Semantic Template for Evaluation of Large Language Models》的翻译。 大型语言模型评估的元语义模板 摘要1 引言2 相关工作3 方法4 实验5 结论 摘要 大型语言模型(llm)是否真正理解语言的语义&#xff0c;或者只是记住训练数据?…...

Git相关知识(1)

目录 1.初识Git 1.基础知识 2.centos中下载 2.基本操作 1.创建本地仓库 2.配置本地仓库 3.版本库、工作区、暂存区 4.添加文件 5.add和commit对git文件的作用 6.修改文件 7.版本回退 8.撤销修改 9.删除文件 3.分支操作 1.HEAD与分支 2.创建分支 3.删除分支 …...

pytorch中nn.DataParallel多次使用

pytorch中nn.DataParallel多次使用 import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader# 定义模型 class MyModel(nn.Module):def __init__(self):super(MyModel, self).__init__()self.fc nn.Linear(10, 1)def forwa…...

制作电商页面(Html)

任务 制作一个电商页面&#xff0c;要求所卖物品清晰&#xff0c;页面色调清晰&#xff0c;要有主页和详情页。 网站所买物品&#xff1a;书籍 色调&#xff1a;#FF2400 橙红色 代码 主页HTML代码&#xff1a; <html><head><meta charset"utf-8"…...

Android Sutdio依赖Snapshot版本,无法同步最新的包

起因 局域网中搭建了Nexus托管本地打包的aar&#xff0c;正常情况下&#xff0c;把修改完成的库推送到仓库后&#xff0c;其他项目引用Snapshot版本的依赖&#xff0c;同步后会马上下载最新的包&#xff0c;但是当第二次推送后&#xff0c;就没有重新下载最新的包&#xff0c;…...

Feign调用异常触发降级捕获异常

通过配置fallbackFactory来捕获异常信息&#xff0c;代码如下 FeignClient(name "user", fallbackFactory UserFallBackFactory.class) public interface UserFeign {PostMapping("/get/list")Map getList();}Component public class UserFallBackFacto…...

Springboot 音乐网站管理系统idea开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 springboot 音乐网站管理系统是一套完善的信息系统&#xff0c;结合springboot框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用springboot框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统 具有完整的源代码和数据库&…...

微信支付v2-02

...

企业的销售活动是什么?CRM销售管理系统给你答案

在企业业务中&#xff0c;销售活动是实现企业业绩目标的基本单元&#xff0c;起着奠基石的作用。CRM销售管理系统是销售活动管理的必备工具&#xff0c;帮助企业更好地开展销售活动。下面来说说企业的销售活动是什么&#xff1f; 什么是销售活动 简单来说&#xff0c;销售人员…...

【PG】PostgreSQL参数格式 配置文件格式

目录 一 PG参数格式 二 通过配置文件修改参数 postgresql.auto.conf文件 三 通过SQL修改参数 四 通过shell修改参数 五 管理配置文件内容 一 PG参数格式 所有参数名都是大小写不敏感的。每个参数都可以接受五种类型之一的值&#xff1a; 布尔、字符串、整数、 浮点数或枚…...

应用层协议 HTTP

一、应用层协议 我们已经学过 TCP/IP , 已然知道数据能从客户端进程经过路径选择跨网络传送到服务器端进程。 我们还需要知道的是&#xff0c;我们把数据从 A 端传送到 B 端&#xff0c; TCP/IP 解决的是顺丰的功能&#xff0c;而两端还要对数据进行加工处理或者使用&#xf…...

Springboot+vue的应急救援物资管理系统,Javaee项目,springboot vue前后端分离项目。

演示视频&#xff1a; Springbootvue的应急救援物资管理系统&#xff0c;Javaee项目&#xff0c;springboot vue前后端分离项目。 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的应急救援物资管理系统&#xff0c;采用M&#xff08;model&#xff09;V&…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八

现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet&#xff0c;点击确认后如下提示 最终上报fail 解决方法 内核升级导致&#xff0c;需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别

UnsatisfiedLinkError 在对接硬件设备中&#xff0c;我们会遇到使用 java 调用 dll文件 的情况&#xff0c;此时大概率出现UnsatisfiedLinkError链接错误&#xff0c;原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用&#xff0c;结果 dll 未实现 JNI 协…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)

1.获取 authorizationCode&#xff1a; 2.利用 authorizationCode 获取 accessToken&#xff1a;文档中心 3.获取手机&#xff1a;文档中心 4.获取昵称头像&#xff1a;文档中心 首先创建 request 若要获取手机号&#xff0c;scope必填 phone&#xff0c;permissions 必填 …...

排序算法总结(C++)

目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指&#xff1a;同样大小的样本 **&#xff08;同样大小的数据&#xff09;**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...

MySQL 部分重点知识篇

一、数据库对象 1. 主键 定义 &#xff1a;主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 &#xff1a;确保数据的完整性&#xff0c;便于数据的查询和管理。 示例 &#xff1a;在学生信息表中&#xff0c;学号可以作为主键&#xff…...

AI语音助手的Python实现

引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...