从 0 到 1 ,手把手教你编写《消息队列》项目(Java实现) —— 创建虚拟机
文章目录
- 一、虚拟机
- 二、关于消息的API
- 发布消息
- 直接交换机 DIRECT 转发规则
- 扇出交换机 FANOUT 转发规则
- 主题交换机 TOPIC 转发规则
- 匹配规则
- Router类
- 订阅消息
- 消费者
- 队列如何给订阅的消费者发送消息
- 自动发送消息至订阅者
- 应答消息
- 三、代码编写
一、虚拟机
接下来要创建虚拟机,每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.同时提供 api 供上层调用.
在这里咱们实现的单虚拟机,并没有提供创建虚拟机,销毁虚拟机的功能,但是为了方便后续的扩展,咱们要约定好如何区分多个虚拟机之间的交换机,队列,绑定关系.
不同虚拟机当然可以拥有相同名字的交换机等.
比如
虚拟机A中 拥有 交换机C,
虚拟机B中 拥有 交换机C,
咱们视为以上情况是合法的.
咱们这里采取的方案是,在客户提供的交换机等的身份标识(交换机名字),前加上虚拟机的名字.
即 客户要在虚拟机 VirtualHostA 中创建交换机 exchangeC,咱们服务器存储的交换机名字是 VirtualHostAexchangeC.
当然也有其他方案,大家可以自由发挥.
二、关于消息的API
这是虚拟机要提供给上层的API
前6个API咱们已经写好了,只需要直接调用下层的API即可.
咱们现在来考虑后 7 - 9 这三个API的实现.
发布消息
发布消息API:其实就是生产者将消息发送给对应的交换机,交换机再根据不同的转发规则,转发给与之相绑定且符合规则的消息队列.
绑定关系 Binding 中有一个 bindingKey 属性
消息 Message 中 有一个 routingKey 属性
下面就来讲解一下三种交换机的转发规则已经这两个 Key 的不同含义.
直接交换机 DIRECT 转发规则
在直接交换机中,
bindingKye是无意义的,
routingKey是要转发到的队列的队列名.
直接交换机的转发规则, 是无视 bindingKey的,即 直接交换机是否与这个队列绑定都没有关系,而直接将消息转发到 routingKey指定的队列名的队列中.
扇出交换机 FANOUT 转发规则
在扇出交换机中,
bindingKye是无意义的,
routingKey是无意义的.
扇出交换机的转发规则,是将收到的消息转发到与之绑定的所有队列中.与bindingKye和routingKey是没有任何关系的.
主题交换机 TOPIC 转发规则
在主题交换机中,
bindingKey是创建绑定时,给绑定指定的特殊字符串(相当于一把锁),
routingKey是转发消息时,给消息指定的特殊字符串(相当于一把钥匙).
主题交换机的转发规则,是将收到的消息的routingKey与绑定的所有队列中的 bindingKey 进行匹配,当且仅当匹配成功时,才将消息转发给该队列.
匹配规则
routingKey规则
- 由数字,字母,下划线组成
- 使用 . 将routingKey分成多个部分.
bindingKey规则
- 由数字,字母,下划线组成
- 使用 . 将routingKey分成多个部分.
- 支持两种特殊的符号作为通配符 * 与 # (*和#必须是作为被 . 分割出来的单独部分如 aaa*.bb就是非法的)
* 可以匹配任何一个独立的部分
# 可以匹配0个或多个的独立部分
匹配规则
Router类
在core包中创建 Router类,来完成对bindingKey与routingKey的校验与匹配.
/*** 这个类用来检查 bindingKey与routingKey 是否合法* 以及 bindingKey与routingKey 的匹配功能,* 以及 根据不同交换机的转发规则判断 消息 Message 是否可以转发给对应的绑定队列*/
public class Router {// bindingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.// 检查 BindingKey 是否合法public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况; aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为约定的).// 这样约定是因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性 提升不大~~// 1. aaa.#.#.bbb => 非法// 2. aaa.#.*.bbb => 非法// 3. aaa.*.#.bbb => 非法// 4. aaa.*.*.bbb => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}// 检查 RoutingKey 是否合法// routingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分public boolean checkRoutingKey(String routingKey) {if (routingKey.length() == 0) {// 空字符串. 合法的情况. 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 ""return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);// 判定该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}// 判定该字母是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}// 判定该字母是否是阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}// 判定是否是 _ 或者 .if (ch == '_' || ch == '.') {continue;}// 该字符, 不是上述任何一种合法情况, 就直接返回 falsereturn false;}// 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 truereturn true;}// 用来判断该消息是否可以转发给这个绑定对应的队列public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException {// 根据不同的 exchangeType 使用不同的转发规则if (exchangeType == ExchangeType.FANOUT) {// 如果是 FANOUT 类型,则该交换机上绑定的所有队列都需要转发return true;} else if (exchangeType == ExchangeType.TOPIC) {return routeTopic(binding,message);} else {throw new MqException("[Router] 交换机类型非法! exchangeType="+exchangeType);}}// 用来匹配 bindingKey与routingKeyprivate boolean routeTopic(Binding binding, Message message) {// 先把这两个 key 进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 引入两个下标, 指向上述两个数组. 初始情况下都为 0int bindingIndex = 0;int routingIndex = 0;// 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 forwhile (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {if (bindingTokens[bindingIndex].equals("*")) {// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!bindingIndex++;routingIndex++;continue;} else if (bindingTokens[bindingIndex].equals("#")) {// 如果遇到 #, 需要先看看有没有下一个位置.bindingIndex++;if (bindingIndex == bindingTokens.length) {// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!return true;}// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex == -1) {// 没找到匹配的结果. 匹配失败return false;}// 找到的匹配的情况, 继续往后匹配.bindingIndex++;routingIndex++;} else {// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex++;routingIndex++;}}// [情况五] 判定是否是双方同时到达末尾// 比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的.if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if (routingTokens[i].equals(bindingToken)) {return i;}}return -1;}
}
订阅消息
消费者
咱们要创建一个消费者类,其中有消费者的信息,以及该消费者订阅的队列的名字,
以及消息应答模式,以及回调函数.
回调函数是什么?
是让消费者自己设定一个函数,当有新的消息转发给该消费者后,就执行这个回调函数
/*** 消费者的回调函数*/
@FunctionalInterface
public interface Consumer {// Delivery 是 “投递” 的意思,这个方法预期是在每次服务器收到消息之后,来调用// 通过这个方法把消息推送给对应的消费者// 这里的方法名与参数列表都是参考 RabbitMQ 的void handleDelivery(String consumerTag, BasicProperties basicProperties,byte[] body) throws MqException, IOException;
}
/*** 表示一个消费者(完整的执行环境)*/
@Data
public class ConsumerEnv {// 消费者信息private String consumerTag;// 订阅队列的名字private String queueName;// true -> 自动应答, false -> 手动应答private boolean autoAck;// 回调函数private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}
}
队列如何给订阅的消费者发送消息
这里咱们要想清楚的是,一个队列可以有多个消费者,
新来的消息要转发给哪个消费者呢?
咱们在这里采取轮询策略,即让消费者排队,依次将消息发送给消费者,当消费者收到消息后,则移动到队伍的最后等待下个消息.
因此咱们要给核心类 Message类再增加几个属性和方法,来管理消费者,
/*** 表示一个存储消息的队列* MSG =》Message* 消息队列的使用者是消费者*/
@Data
public class MSGQueue {// 表示队列的身份标识private String name;// 表示队列是否持久化private boolean durable = false;// true -> 这个队列只能被一个消费者使用,false -> 大家都能使用这个队列// 后续代码不实现相关功能private boolean exclusive = false;// true -> 没人使用后,自动删除,false -> 没人使用,不自动删除private boolean autoDelete = false;// 表示扩展参数,后续代码没有实现private Map<String,Object> arguments = new HashMap<>();// 当前队列有哪些消费者订阅了private List<ConsumerEnv> consumerEnvList = new ArrayList<>();// 记录当前取到了第几个消费者,方便实现轮询策略private AtomicInteger consumerSeq = new AtomicInteger(0);// 添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}// 删除订阅者暂不考虑// 挑选一个订阅者,用来处理当前的消息(按照轮询的方式)public ConsumerEnv chooseConsumer() {// 无人订阅if (consumerEnvList.size() == 0) {return null;}int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}public String getArguments() {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}public void setArguments(String arguments) {ObjectMapper objectMapper = new ObjectMapper();try {this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public void setArguments(Map<String,Object> arguments) {this.arguments = arguments;}public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key,Object value) {arguments.put(key, value);}
}
自动发送消息至订阅者
那么消费者要如何拿到消息呢?即如何将消息发送给消费者,咱们这里采取的是自动发送,即队列中来了新消息,就自动将新消息发送给订阅了这个队列的消费者.
咱们实现的方法是,使用一个阻塞队列,当生产者发布消息到交换机时,交换机转发消息到对应的队列后,就把队列名当作令牌添加到这个阻塞队列中,再配置一个扫描线程,去时刻扫描这个阻塞队列中是否有新的令牌了,有了新令牌,则根据令牌去对应的队列中,去把新消息安装轮询策略转发给消费者.
应答消息
应答消息共有两种模式.
- 自动应答:将消息发送给消费者就算应答了(不关心消费者收没收到,相当于没应答)
- 手动应答:需要消费者手动调用应答方法(确保消费者收到消息了)
三、代码编写
/*** 通过这个类, 来表示 虚拟主机.* 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.* 同时提供 api 供上层调用.* 针对 VirtualHost 这个类, 作为业务逻辑的整合者, 就需要对于代码中抛出的异常进行处理了.*/@Data
public class VirtualHost {private String virtualHostName;private Router router = new Router();private DiskDataCenter diskDataCenter = new DiskDataCenter();private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();// 操作交换机的锁对象,防止多线程操作交换机时,出现线程安全问题,如 创建了两个拥有相同身份标识的交换机private final Object exchangeLocker = new Object();// 操作队列的锁对象,防止多线程,操作队列时,出现线程安全问题,如 创建了两个拥有相同身份标识的队列private final Object queueLocker = new Object();// 消费者管理中心ConsumerManager consumerManager = new ConsumerManager(this);public VirtualHost(String virtualHostName) {this.virtualHostName = virtualHostName;// MemoryDataCenter 并不需要初始化,当 new MemoryDataCenter();时,里面所需的数据结构也都已经创建好了// DiskDataCenter 就需要初始化操作,去建库建表建文件和初始化数据的设定diskDataCenter.init();try {// 将硬盘中已有的数据恢复到内存中memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();}}// 创建交换机// 如果交换机不存在, 就创建. 如果存在, 直接返回.// 返回值是 boolean. 创建成功, 返回 true. 失败返回 falsepublic boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String,Object> arguments) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 把交换机的名字, 加上虚拟主机作为前缀.// 1.判断交换机是否存在,直接从内存查询if (memoryDataCenter.getExchange(exchangeName) != null) {// 该交换机已经存在!System.out.println("[VirtualHost] 交换机已经存在! exchangeName=" + exchangeName);return true;}// 2.创建交换机Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3.把交换机对象写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}// 4.把交换机对象写入内存memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建成功! exchangeName=" + exchangeName);// 上述逻辑, 先写硬盘, 后写内存. 目的就是因为硬盘更容易写失败. 如果硬盘写失败了, 内存就不写了.// 要是先写内存, 内存写成功了, 硬盘写失败了, 还需要把内存的数据给再删掉. 就比较麻烦了.}return true;} catch (Exception e) {System.out.println("[VirtualHost] 交换机创建失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}// 删除交换机public boolean exchangeDelete(String exchangeName) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 1.判断交换机是否存在Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if (existsExchange == null) {throw new MqException("[virtualHostName] 交换机不存在无法删除!");}if (existsExchange.isDurable()) {// 2.删除硬盘上的数据diskDataCenter.deleteExchange(exchangeName);}// 3.删除内存上的数据memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}// 创建队列public boolean queueuDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments) {// 给队列名字加前缀queueName = virtualHostName + queueName;try {synchronized (queueLocker) {// 1.判断队列是否存在if (memoryDataCenter.getQueue(queueName) != null) {System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName);return true;}// 2.创建队列MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3.向硬盘中写入数据if (durable) {diskDataCenter.insertQueue(queue);}// 4.向内存中写入数据memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName);e.printStackTrace();return false;}}// 删除队列public boolean queueDelete(String queueeName) {queueeName = virtualHostName + queueeName;try {synchronized (queueLocker) {MSGQueue existsQueue = memoryDataCenter.getQueue(queueeName);// 1.判断交换机是否存在if (existsQueue == null) {throw new MqException("[virtualHostName] 队列不存在无法删除!");}// 2.删除硬盘上的数据if (existsQueue.isDurable()) {diskDataCenter.deleteQueue(queueeName);}// 3.删除内存上的数据memoryDataCenter.deleteQueue(queueeName);System.out.println("[VirtualHost] 队列删除成功! queueeName=" + queueeName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 队列删除失败! queueeName=" + queueeName);e.printStackTrace();return false;}}// 创建绑定public boolean bindingDeclare(String exchangeName,String queueName,String bindingKey) {exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在,对应的交换机 队列是否存在,bindingKey是否合法if (memoryDataCenter.getBinding(exchangeName, queueName) != null) {return true;}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}if (!router.checkBindingKey(bindingKey)) {throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);}// 2.创建绑定Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 3.写入硬盘if (exchange.isDurable() && queue.isDurable()) {diskDataCenter.insertBinding(binding);}// 4.写入内存memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName+ ", queueName=" + queueName);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName+ ", queueName=" + queueName);e.printStackTrace();return false;}}// 删除绑定public boolean bindingDelete(String exchangeName,String queueName) {exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在,对应的交换机 队列是否存在,bindingKey是否合法Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if (binding == null) {throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);}// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.diskDataCenter.deleteBinding(binding);// 3. 删除内存的数据memoryDataCenter.deleteBinding(binding);System.out.println("[VirtualHost] 删除绑定成功!");}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 删除绑定失败!");e.printStackTrace();return false;}}// 发送消息到指定的交换机/队列中public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {exchangeName = virtualHostName + exchangeName;// 1.检查 routingKey是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);}// 2.查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 3.判断交换机绑定类型if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.// 4.查找对应的队列String queueName = virtualHostName + routingKey;MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}// 5.构造 messageMessage message = Message.createMessageWithId(routingKey, basicProperties, body);// 6.发送消息sendMessage(queue,message);return true;} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBinding(exchangeName);for (Map.Entry<String,Binding> entry : bindingsMap.entrySet()) {// 6.获取绑定对象,判断对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null){// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());continue;}// 7.构造消息Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 8.判断该消息是否可以发送给该队列if (router.route(exchange.getType(),binding,message)) {sendMessage(queue,message);}}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败!");e.printStackTrace();return false;}}// 发送消息到硬盘与内存private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息,即 把消息写入到 硬盘 和 内存 中int deliverMode = message.getDeliverMode();// deliverMode 为 1 不持久化,为 2 持久化if (deliverMode == 2) {diskDataCenter.sendMessage(queue,message);}// 写入内存memoryDataCenter.sendMessage(queue,message);// 通知消费者可以消费消息了consumerManager.notifyConsume(queue.getName());}// 订阅消息// 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后,应答的方式 为 true 自动应答,为 false 手动应答// consumer: 是一个回调函数,此处类型设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda样子public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[VirtualHost] basicConsume成功 queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume失败 queueName=" + queueName);e.printStackTrace();return false;}}// 消息的手动应答public boolean basicAck(String queueName,String messageId) {queueName = virtualHostName + queueName;try {// 1.获取消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);}// 2.删除硬盘上的消息if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}// 3.删除消息中心的消息memoryDataCenter.removeMessage(messageId);// 4.删除待确认消息中的消息memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName + ", messageId="+ messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败! 消息被成功确认! queueName=" + queueName + ", messageId="+ messageId);e.printStackTrace();return false;}}}
相关文章:

从 0 到 1 ,手把手教你编写《消息队列》项目(Java实现) —— 创建虚拟机
文章目录 一、虚拟机二、关于消息的API发布消息直接交换机 DIRECT 转发规则扇出交换机 FANOUT 转发规则主题交换机 TOPIC 转发规则匹配规则Router类 订阅消息消费者队列如何给订阅的消费者发送消息自动发送消息至订阅者 应答消息 三、代码编写 一、虚拟机 接下来要创建虚拟机,…...
GIT版本控制--前言
欢迎来到《GIT版本控制》专栏!在当今软件开发和协作的世界中,版本控制是不可或缺的工具之一。无论您是一名初学者,一位经验丰富的开发者,还是一个项目团队的成员,都有可能会受益于对GIT的深入了解。 GIT是一个强大的分…...
由于 MAC 地址的问题,导致网络不通的原因和分析
由于 MAC 地址的问题,导致网络不通的原因和分析 将现象及原因分析发给大家,供大家参考,以后有类似问题时有个解决问题的参考开发板网络不通,也抓不到包,折腾了好久,将电脑和开发板用网线直连,结…...
游戏开发中的设计模式
单例模式 实例化单一对象,懒加载 //单例模式 class GameManagerSingleton {private constructor(){}private static instance:GameManagerSingleton;public static Instance(){if(!GameManagerSingleton.instance){this.instance new GameManagerSingleton();}re…...

React核心原理与实际开发
学习目标 React是啥? 官方定义:将前端请求获取到的数据渲染为HTML视图的JavaScript库。 一、React入门 1、React项目创建 直接创建react,使用初始化会创建package.json npm init -y再安装 2、React基本使用 使用纯JS创建ReactDOM&#…...

Springboot+vue的企业OA管理系统(有报告),Javaee项目,springboot vue前后端分离项目。
演示视频: Springbootvue的企业OA管理系统(有报告),Javaee项目,springboot vue前后端分离项目。 项目介绍: 本文设计了一个基于Springbootvue的前后端分离的企业OA管理系统,采用M(m…...

3、字符设备驱动框架和开发步骤
一、Linux内核对文件的分类 Linux的文件种类 1、-:普通文件2、d:目录文件3、p:管道文件4、s:本地socket文件5、l:链接文件6、c:字符设备7、b:块设备 Linux内核按驱动程序实现模型框架的不同&…...

[MySQL]基础篇
文章目录 1. MySQL基本使用1.1 MySQL的启动和登录1.1.1 MySQL的启动1.1.2 MySQL的客户端连接 1.2 数据模型 2. SQL2.1 SQL类型2.1.1 数值类型2.1.2 字符串类型2.1.3 日期类型 2.2 DDL2.2.1 数据库操作2.2.2 表操作 - 查询2.2.3 表操作 - 创建表2.2.4 表操作 - 修改 2.3 DML2.3.…...
Meta Semantic Template for Evaluation of Large Language Models
本文是LLM系列文章,针对《Meta Semantic Template for Evaluation of Large Language Models》的翻译。 大型语言模型评估的元语义模板 摘要1 引言2 相关工作3 方法4 实验5 结论 摘要 大型语言模型(llm)是否真正理解语言的语义,或者只是记住训练数据?…...

Git相关知识(1)
目录 1.初识Git 1.基础知识 2.centos中下载 2.基本操作 1.创建本地仓库 2.配置本地仓库 3.版本库、工作区、暂存区 4.添加文件 5.add和commit对git文件的作用 6.修改文件 7.版本回退 8.撤销修改 9.删除文件 3.分支操作 1.HEAD与分支 2.创建分支 3.删除分支 …...

pytorch中nn.DataParallel多次使用
pytorch中nn.DataParallel多次使用 import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader# 定义模型 class MyModel(nn.Module):def __init__(self):super(MyModel, self).__init__()self.fc nn.Linear(10, 1)def forwa…...

制作电商页面(Html)
任务 制作一个电商页面,要求所卖物品清晰,页面色调清晰,要有主页和详情页。 网站所买物品:书籍 色调:#FF2400 橙红色 代码 主页HTML代码: <html><head><meta charset"utf-8"…...
Android Sutdio依赖Snapshot版本,无法同步最新的包
起因 局域网中搭建了Nexus托管本地打包的aar,正常情况下,把修改完成的库推送到仓库后,其他项目引用Snapshot版本的依赖,同步后会马上下载最新的包,但是当第二次推送后,就没有重新下载最新的包,…...
Feign调用异常触发降级捕获异常
通过配置fallbackFactory来捕获异常信息,代码如下 FeignClient(name "user", fallbackFactory UserFallBackFactory.class) public interface UserFeign {PostMapping("/get/list")Map getList();}Component public class UserFallBackFacto…...

Springboot 音乐网站管理系统idea开发mysql数据库web结构java编程计算机网页源码maven项目
一、源码特点 springboot 音乐网站管理系统是一套完善的信息系统,结合springboot框架和bootstrap完成本系统,对理解JSP java编程开发语言有帮助系统采用springboot框架(MVC模式开发),系统 具有完整的源代码和数据库&…...

微信支付v2-02
...

企业的销售活动是什么?CRM销售管理系统给你答案
在企业业务中,销售活动是实现企业业绩目标的基本单元,起着奠基石的作用。CRM销售管理系统是销售活动管理的必备工具,帮助企业更好地开展销售活动。下面来说说企业的销售活动是什么? 什么是销售活动 简单来说,销售人员…...
【PG】PostgreSQL参数格式 配置文件格式
目录 一 PG参数格式 二 通过配置文件修改参数 postgresql.auto.conf文件 三 通过SQL修改参数 四 通过shell修改参数 五 管理配置文件内容 一 PG参数格式 所有参数名都是大小写不敏感的。每个参数都可以接受五种类型之一的值: 布尔、字符串、整数、 浮点数或枚…...

应用层协议 HTTP
一、应用层协议 我们已经学过 TCP/IP , 已然知道数据能从客户端进程经过路径选择跨网络传送到服务器端进程。 我们还需要知道的是,我们把数据从 A 端传送到 B 端, TCP/IP 解决的是顺丰的功能,而两端还要对数据进行加工处理或者使用…...

Springboot+vue的应急救援物资管理系统,Javaee项目,springboot vue前后端分离项目。
演示视频: Springbootvue的应急救援物资管理系统,Javaee项目,springboot vue前后端分离项目。 项目介绍: 本文设计了一个基于Springbootvue的前后端分离的应急救援物资管理系统,采用M(model)V&…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
R语言AI模型部署方案:精准离线运行详解
R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...
安卓基础(Java 和 Gradle 版本)
1. 设置项目的 JDK 版本 方法1:通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分,设置 Gradle JDK 方法2:通过 Settings File → Settings... (或 CtrlAltS)…...

沙箱虚拟化技术虚拟机容器之间的关系详解
问题 沙箱、虚拟化、容器三者分开一一介绍的话我知道他们各自都是什么东西,但是如果把三者放在一起,它们之间到底什么关系?又有什么联系呢?我不是很明白!!! 就比如说: 沙箱&#…...
土建施工员考试:建筑施工技术重点知识有哪些?
《管理实务》是土建施工员考试中侧重实操应用与管理能力的科目,核心考查施工组织、质量安全、进度成本等现场管理要点。以下是结合考试大纲与高频考点整理的重点内容,附学习方向和应试技巧: 一、施工组织与进度管理 核心目标: 规…...

鸿蒙Navigation路由导航-基本使用介绍
1. Navigation介绍 Navigation组件是路由导航的根视图容器,一般作为Page页面的根容器使用,其内部默认包含了标题栏、内容区和工具栏,其中内容区默认首页显示导航内容(Navigation的子组件)或非首页显示(Nav…...
PostgreSQL 与 SQL 基础:为 Fast API 打下数据基础
在构建任何动态、数据驱动的Web API时,一个稳定高效的数据存储方案是不可或缺的。对于使用Python FastAPI的开发者来说,深入理解关系型数据库的工作原理、掌握SQL这门与数据库“对话”的语言,以及学会如何在Python中操作数据库,是…...

【向量库】Weaviate概述与架构解析
文章目录 一、什么是weaviate二、High-Level Architecture1. Core Components2. Storage Layer3. 组件交互流程 三、核心组件1. API Layer2. Schema Management3. Vector Indexing3.1. 查询原理3.2. 左侧:Search Process(搜索流程)3.3. 右侧&…...

claude3.7高阶玩法,生成系统架构图,国内直接使用
文章目录 零、前言一、操作指南操作指导 二、提示词模板三、实战图书管理系统通过4o模型生成系统描述通过claude3.7生成系统架构图svg代码转换成图片 在线考试系统通过4o模型生成系统描述通过claude3.7生成系统架构图svg代码转换成图片 四、感受 零、前言 现在很多AI大模型可以…...