当前位置: 首页 > news >正文

✔ ★Java项目——设计一个消息队列(五)【虚拟主机设计】

虚拟主机设计

  • 创建 VirtualHost
    • 实现构造⽅法和 getter
    • 创建交换机
    • 删除交换机
    • 创建队列
    • 删除队列
    • 创建绑定
    • 删除绑定
    • 发布消息 ★
    • 路由规则
      • 1) 实现 route ⽅法
      • 2) 实现 checkRoutingKeyValid
      • 3) 实现 checkBindingKeyValid
      • 4) 实现 routeTopic
      • 5) 匹配规则测试⽤例
      • 6) 测试 Router
    • 订阅消息
      • 1) 添加⼀个订阅者
      • 2) 创建订阅者管理管理类
      • 3) 添加令牌接⼝
      • 4) 实现添加订阅者
      • 给 MsgQueue 添加⼀个订阅者列表
      • 5) 实现扫描线程
      • 6) 实现消费消息
    • ⼩结
    • 消息确认
    • 测试 VirtualHost

在这里插入图片描述

在这里插入图片描述

创建 VirtualHost

创建 mqserver.VirtualHost
在这里插入图片描述
其中 Router ⽤来定义转发规则, ConsumerManager ⽤来实现消息消费. 这两个内容后续再介绍

实现构造⽅法和 getter

在这里插入图片描述

创建交换机

• 此处的 autoDelete, arguments 其实并没有使⽤. 只是先预留出来. (RabbitMQ 是⽀持的) .
• 约定, 交换机/队列的名字, 都加上 VirtualHostName 作为前缀. 这样不同 VirtualHost 中就可以存在
同名的交换机或者队列了.
• exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回. 因此不叫做 “exchangeCreate”.
• 先写硬盘, 后写内存. 因为写硬盘失败概率更⼤. 如果硬盘写失败了, 也就不必写内存了.

在这里插入图片描述

删除交换机

在这里插入图片描述

创建队列

在这里插入图片描述

删除队列

在这里插入图片描述

创建绑定

在这里插入图片描述

删除绑定

在这里插入图片描述

发布消息 ★

• 发布消息其实是把消息发送给指定的 Exchange, 再根据 Exchange 和 Queue 的 Binding 关系, 转发到对应队列中.
• 发送消息需要指定 routingKey, 这个值的作⽤和 ExchangeType 是相关的.
◦ Direct: routingKey 就是对应队列的名字. 此时不需要 binding 关系, 也不需要 bindingKey, 就可以直接转发消息.
◦ Fanout: routingKey 不起作⽤, bindingKey 也不起作⽤. 此时消息会转发给绑定到该交换机上的所有队列中.
◦ Topic: routingKey 是⼀个特定的字符串, 会和 bindingKey 进⾏匹配. 如果匹配成功, 则发到对应的队列中. 具体规则后续介绍.
• BasicProperties 是消息的元信息. body 是消息本体.

    // 发送消息到指定的交换机/队列中.public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机的名字exchangeName = virtualHostName + exchangeName;// 2. 检查 routingKey 是否合法.if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);}// 3. 查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 4. 判定交换机的类型if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.String queueName = virtualHostName + routingKey;// 5. 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 6. 查找该队列名对应的对象MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}// 7. 队列存在, 直接给队列中写入消息sendMessage(queue, message);} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {// 1) 获取到绑定对象, 判定对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null) {// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());continue;}// 2) 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 3) 判定这个消息是否能转发给该队列.//    如果是 fanout, 所有绑定的队列都要转发的.//    如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.if (!router.route(exchange.getType(), binding, message)) {continue;}// 4) 真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败!");e.printStackTrace();return false;}}private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上.int deliverMode = message.getDeliverMode();// deliverMode 为 1 , 不持久化. deliverMode 为 2 表示持久化.if (deliverMode == 2) {diskDataCenter.sendMessage(queue, message);}// 写入内存memoryDataCenter.sendMessage(queue, message);// 此处还需要补充一个逻辑, 通知消费者可以消费消息了.consumerManager.notifyConsume(queue.getName());}

路由规则

1) 实现 route ⽅法

    public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {// 根据不同的 exchangeType 使用不同的判定转发规则.if (exchangeType == ExchangeType.FANOUT) {// 如果是 FANOUT 类型, 则该交换机上绑定的所有队列都需要转发return true;} else if (exchangeType == ExchangeType.TOPIC) {// 如果是 TOPIC 主题交换机, 规则就要更复杂一些.return routeTopic(binding, message);} else {// 其他情况是不应该存在的.throw new MqException("[Router] 交换机类型非法! exchangeType=" + exchangeType);}}

2) 实现 checkRoutingKeyValid

    public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为(俺)约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb    => 非法// 2. aaa.#.*.bbb    => 非法// 3. aaa.*.#.bbb    => 非法// 4. aaa.*.*.bbb    => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}

3) 实现 checkBindingKeyValid

    // bindingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为(俺)约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb    => 非法// 2. aaa.#.*.bbb    => 非法// 3. aaa.*.#.bbb    => 非法// 4. aaa.*.*.bbb    => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}

4) 实现 routeTopic

    // [测试用例]// binding key          routing key         result// aaa                  aaa                 true// aaa.bbb              aaa.bbb             true// aaa.bbb              aaa.bbb.ccc         false// aaa.bbb              aaa.ccc             false// aaa.bbb.ccc          aaa.bbb.ccc         true// aaa.*                aaa.bbb             true// aaa.*.bbb            aaa.bbb.ccc         false// *.aaa.bbb            aaa.bbb             false// #                    aaa.bbb.ccc         true// aaa.#                aaa.bbb             true// aaa.#                aaa.bbb.ccc         true// aaa.#.ccc            aaa.ccc             true// aaa.#.ccc            aaa.bbb.ccc         true// aaa.#.ccc            aaa.aaa.bbb.ccc     true// #.ccc                ccc                 true// #.ccc                aaa.bbb.ccc         trueprivate boolean routeTopic(Binding binding, Message message) {// 先把这两个 key 进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 引入两个下标, 指向上述两个数组. 初始情况下都为 0int bindingIndex = 0;int routingIndex = 0;// 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 forwhile (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {if (bindingTokens[bindingIndex].equals("*")) {// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!bindingIndex++;routingIndex++;continue;} else if (bindingTokens[bindingIndex].equals("#")) {// 如果遇到 #, 需要先看看有没有下一个位置.bindingIndex++;if (bindingIndex == bindingTokens.length) {// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!return true;}// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex == -1) {// 没找到匹配的结果. 匹配失败return false;}// 找到的匹配的情况, 继续往后匹配.bindingIndex++;routingIndex++;} else {// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex++;routingIndex++;}}// [情况五] 判定是否是双方同时到达末尾// 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失败的.if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if (routingTokens[i].equals(bindingToken)) {return i;}}return -1;}

5) 匹配规则测试⽤例

// [测试用例]
// binding key          routing key         result
// aaa                  aaa                 true
// aaa.bbb              aaa.bbb             true
// aaa.bbb              aaa.bbb.ccc         false
// aaa.bbb              aaa.ccc             false
// aaa.bbb.ccc          aaa.bbb.ccc         true
// aaa.*                aaa.bbb             true
// aaa.*.bbb            aaa.bbb.ccc         false
// *.aaa.bbb            aaa.bbb             false
// #                    aaa.bbb.ccc         true
// aaa.#                aaa.bbb             true
// aaa.#                aaa.bbb.ccc         true
// aaa.#.ccc            aaa.ccc             true
// aaa.#.ccc            aaa.bbb.ccc         true
// aaa.#.ccc            aaa.aaa.bbb.ccc     true
// #.ccc                ccc                 true
// #.ccc                aaa.bbb.ccc         true

6) 测试 Router

package com.example.mq;import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.ExchangeType;
import com.example.mq.mqserver.core.Message;
import com.example.mq.mqserver.core.Router;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class RouterTests {private Router router = new Router();private Binding binding = null;private Message message = null;@BeforeEachpublic void setUp() {binding = new Binding();message = new Message();}@AfterEachpublic void tearDown() {binding = null;message = null;}// [测试用例]// binding key          routing key         result// aaa                  aaa                 true// aaa.bbb              aaa.bbb             true// aaa.bbb              aaa.bbb.ccc         false// aaa.bbb              aaa.ccc             false// aaa.bbb.ccc          aaa.bbb.ccc         true// aaa.*                aaa.bbb             true// aaa.*.bbb            aaa.bbb.ccc         false// *.aaa.bbb            aaa.bbb             false// #                    aaa.bbb.ccc         true// aaa.#                aaa.bbb             true// aaa.#                aaa.bbb.ccc         true// aaa.#.ccc            aaa.ccc             true// aaa.#.ccc            aaa.bbb.ccc         true// aaa.#.ccc            aaa.aaa.bbb.ccc     true// #.ccc                ccc                 true// #.ccc                aaa.bbb.ccc         true@Testpublic void test1() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test2() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test3() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test4() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test5() throws MqException {binding.setBindingKey("aaa.bbb.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test6() throws MqException {binding.setBindingKey("aaa.*");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test7() throws MqException {binding.setBindingKey("aaa.*.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test8() throws MqException {binding.setBindingKey("*.aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test9() throws MqException {binding.setBindingKey("#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test10() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test11() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test12() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test13() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test14() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test15() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test16() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}
}

订阅消息

1) 添加⼀个订阅者

    // 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);e.printStackTrace();return false;}}

Consumer 相当于⼀个回调函数. 放到 common.Consumer 中

@FunctionalInterface
public interface Consumer {// Delivery 的意思是 "投递", 这个方法预期是在每次服务器收到消息之后, 来调用.// 通过这个方法把消息推送给对应的消费者.// (注意! 这里的方法名和参数, 也都是参考 RabbitMQ 展开的)void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}

2) 创建订阅者管理管理类

在这里插入图片描述
• parent ⽤来记录虚拟主机.
• 使⽤⼀个阻塞队列⽤来触发消息消费. 称为令牌队列. 每次有消息过来了, 都往队列中放⼀个令牌(也就是队列名), 然后消费者再去消费对应队列的消息.
• 使⽤⼀个线程池⽤来执⾏消息回调.

这样令牌队列的设定避免搞出来太多线程. 否则就需要给每个队列都安排⼀个单独的线程了, 如果队列很多则开销就⽐较⼤了

3) 添加令牌接⼝

    // 这个方法的调用时机就是发送消息的时候.public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}

4) 实现添加订阅者

• 新来订阅者的时候, 需要先消费掉之前积压的消息.
• consumeMessage 真正的消息消费操作, ⼀会再实现.

    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}

创建 ConsumerEnv , 这个类表⽰⼀个订阅者的执⾏环境

public class ConsumerEnv {private String consumerTag;private String queueName;private boolean autoAck;// 通过这个回调来处理收到的消息.private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}

给 MsgQueue 添加⼀个订阅者列表

在这里插入图片描述
此处的 chooseConsumer 是实现⼀个轮询效果. 如果⼀个队列有多个订阅者, 将会按照轮询的⽅式轮
流拿到消息

5) 实现扫描线程

在 ConsumerManager 中创建⼀个线程, 不停的尝试扫描令牌队列. 如果拿到了令牌, 就真正触发消费消息操作

    public ConsumerManager(VirtualHost p) {parent = p;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.scannerThread.setDaemon(true);scannerThread.start();}

6) 实现消费消息

    private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if (message == null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workerPool.submit(() -> {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.//    如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 1) 删除硬盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});}

注意: ⼀个队列可能有 N 个消费者, 此处应该按照轮询的⽅式挑⼀个消费者进⾏消费.

⼩结

⼀. 消费消息的两种典型情况

  1. 订阅者已经存在了, 才发送消息
    这种直接获取队列的订阅者, 从中按照轮询的⽅式挑⼀个消费者来调⽤回调即可.
  2. 消息先发送到队列了, 订阅者还没到.
    此时当订阅者到达, 就快速把指定队列中的消息全都消费掉.
    ⼆. 关于消息不丢失的论证
    每个消息在从内存队列中出队列时, 都会先进⼊ 待确认 中.
    • 如果 autoAck 为 true
    消息被消费完毕后(执⾏完消息回调之后), 再执⾏清除⼯作.
    分别清除硬盘数据, 待确认队列, 消息中⼼.
    • 如果 autoAck 为 false
    在回调内部, 进⾏清除⼯作.
    分别清除硬盘数据, 待确认队列, 消息中⼼.
  3. 执⾏消息回调的时候抛出异常
    此时消息仍然处在待确认队列中.
    此时可以⽤⼀个线程扫描待确认队列, 如果发现队列中的消息超时未确认, 则放⼊死信队列.
    死信队列咱们此处暂不实现.
  4. 执⾏消息回调的时候服务器宕机
    内存所有数据都没了, 但是消息在硬盘上仍然存在. 会在服务下次启动的时候, 加载回内存. 重新被消费到.

消息确认

在这里插入图片描述

测试 VirtualHost

package com.example.mq;import com.example.mq.common.Consumer;
import com.example.mq.mqserver.VirtualHost;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;@SpringBootTest
public class VirtualHostTests {private VirtualHost virtualHost = null;@BeforeEachpublic void setUp() {MqApplication.context = SpringApplication.run(MqApplication.class);virtualHost = new VirtualHost("default");}@AfterEachpublic void tearDown() throws IOException {MqApplication.context.close();virtualHost = null;// 把硬盘的目录删除掉File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);}@Testpublic void testExchangeDeclare() {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);}@Testpublic void testExchangeDelete() {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDelete("testExchange");Assertions.assertTrue(ok);}@Testpublic void testQueueDeclare() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);}@Testpublic void testQueueDelete() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDelete("testQueue");Assertions.assertTrue(ok);}@Testpublic void testQueueBind() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);}@Testpublic void testQueueUnbind() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);ok = virtualHost.queueUnbind("testQueue", "testExchange");Assertions.assertTrue(ok);}@Testpublic void testBasicPublish() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);}// 先订阅队列, 后发送消息@Testpublic void testBasicConsume1() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {try {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);} catch (Error e) {// 断言如果失败, 抛出的是 Error, 而不是 Exception!e.printStackTrace();System.out.println("error");}}});Assertions.assertTrue(ok);Thread.sleep(500);// 再发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);}// 先发送消息, 后订阅队列.@Testpublic void testBasicConsume2() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeFanout() throws InterruptedException {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue1", "testExchange", "");Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue2", "testExchange", "");Assertions.assertTrue(ok);// 往交换机中发布一个消息ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 两个消费者订阅上述的两个队列.ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeTopic() throws InterruptedException {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");Assertions.assertTrue(ok);ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicAck() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列 [要改的地方, 把 autoAck 改成 false]ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);// [要改的地方, 新增手动调用 basicAck]boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());Assertions.assertTrue(ok);}});Assertions.assertTrue(ok);Thread.sleep(500);}
}

相关文章:

✔ ★Java项目——设计一个消息队列(五)【虚拟主机设计】

虚拟主机设计 创建 VirtualHost实现构造⽅法和 getter创建交换机删除交换机创建队列删除队列创建绑定删除绑定发布消息 ★路由规则1) 实现 route ⽅法2) 实现 checkRoutingKeyValid3) 实现 checkBindingKeyValid4) 实现 routeTopic5) 匹配规则测试⽤例6) 测试 Router 订阅消息1…...

ntfs文件系统的优势 NTFS文件系统的特性有哪些 ntfs和fat32有什么区别 苹果电脑怎么管理硬盘

对于数码科技宅在新购得磁盘之后&#xff0c;出于某种原因会在新的磁盘安装操作系统。在安装操作系统时&#xff0c;首先要对磁盘进行分区和格式化&#xff0c;而在此过程中&#xff0c;操作者们需要选择文件系统。文件系统也决定了之后操作的流程程度&#xff0c;一般文件系统…...

Python Web框架Django项目开发实战:创建在线学习应用

注意&#xff1a;本文的下载教程&#xff0c;与以下文章的思路有相同点&#xff0c;也有不同点&#xff0c;最终目标只是让读者从多维度去熟练掌握本知识点。 下载教程&#xff1a;Python项目开发Django实战-创建在线学习应用-编程案例解析实例详解课程教程.pdf 在当今数字化教…...

用得助全媒体呼叫中心,让AI落到实处帮品牌做营销

怎么让人工智能落到实处的帮助到我们&#xff1f;我们今天来讲讲中关村科金得助全媒体呼叫中心是怎么让AI帮品牌。 这次聊的案例是知名的护肤品牌&#xff0c;该品牌在中国功能性护肤品市场占有率达到20.5%&#xff0c;这么高的市场占有率客户的咨询量也是非常庞大的&#xff0…...

【吃透Java手写】2-Spring(下)-AOP-事务及传播原理

【吃透Java手写】Spring&#xff08;下&#xff09;AOP-事务及传播原理 6 AOP模拟实现6.1 AOP工作流程6.2 定义dao接口与实现类6.3 初始化后逻辑6.4 原生Spring的方法6.4.1 实现类6.4.2 定义通知类&#xff0c;定义切入点表达式、配置切面6.4.3 在配置类中进行Spring注解包扫描…...

Spring原理分析--获取Environment资源对象

1.使用getEnvironment()获取环境信息 ApplicationContext接口继承了EnvironmentCapable接口&#xff0c;可以通过getEnvironment()获取Environment配置信息&#xff0c;例如&#xff1a; SpringBootApplication public class A01 {public static void main(String[] args) th…...

Android GPU渲染SurfaceFlinger合成RenderThread的dequeueBuffer/queueBuffer与fence机制(2)

Android GPU渲染SurfaceFlinger合成RenderThread的dequeueBuffer/queueBuffer与fence机制&#xff08;2&#xff09; 计算fps帧率 用 adb shell dumpsys SurfaceFlinger --list 查询当前的SurfaceView&#xff0c;然后有好多行&#xff0c;再把要查询的行内容完整的传给 ad…...

人民币数字和中文汉字转换

在PHP中&#xff0c;将人民币的中文汉字金额转换为数字&#xff0c;或者将数字转换为人民币的中文汉字金额&#xff0c;通常需要自定义一些函数来实现这一转换过程。下面分别给出这两个转换的示例代码。 数字转人民币中文汉字 function numberToChinese($num) { $cnNums arr…...

07_Flutter使用NestedScrollView+TabBarView滚动位置共享问题修复

07_Flutter使用NestedScrollViewTabBarView滚动位置共享问题修复 一.案发现场 可以看到&#xff0c;上图中三个列表的滑动位置共享了&#xff0c;滑动其中一个列表&#xff0c;会影响到另外两个&#xff0c;这显然不符合要求&#xff0c;先来看下布局&#xff0c;再说明产生这个…...

Java解决垂直鉴权问题(对垂直权限进行校验)

Java解决垂直鉴权问题&#xff08;对垂直权限进行校验&#xff09; 文章目录 Java解决垂直鉴权问题&#xff08;对垂直权限进行校验&#xff09;前言一、垂直鉴权是什么&#xff1f;二、实现过程1.新建接口权限菜单映射表2.项目初始化时加载接口菜单映射关系3.自定义过滤器拦截…...

【MySQL工具】pt-heartbeat

功能 pt-heartbeat - 监控 MySQL 复制延迟。 用法 pt-heartbeat [OPTIONS] [DSN] --update|--monitor|--check|--stop pt-heartbeat 用于测量 MySQL 或 PostgreSQL 服务器上的复制延迟。您可以使用它来更新主服务器或监控从服务器。如果可能&#xff0c;MySQL 连接选项将从您…...

实现vant的年月日时分秒组件

方法&#xff1a;van-datetime-picker&#xff08;type&#xff1a;datetime&#xff09;和 van-picker结合实现。 <template><div class"datetimesec-picker"><van-datetime-pickerref"timePickerRef"type"datetime" //年月日时…...

typescript 命名空间、装饰器

1、命名空间 命名空间&#xff1a;在代码量较大的情况下&#xff0c;为了避免各种变量命名的冲突&#xff0c;可将相似功能的函数、类、接口等放置到命名空间内。同Java的包.Net的命名空间一样&#xff0c;typescript 的命名空间可以将代码包裹起来&#xff0c;只对外暴露需要在…...

GPT问答SAP BW

以下回答由GPT-3.5回答,仅供参考. 这个AI工具超好用&#xff0c;每天都有免费额度&#xff0c;写文章、总结长视频、画图等&#xff0c;都几秒搞定&#xff01;快去下载Sider Chrome或Edge插件&#xff0c;薅羊毛&#xff01; https://sider.ai/invited?c43b289bf2616575daecf…...

使用zdppy_amauth开发激活用户接口

服务端代码&#xff1a; 1、创建数据库连接对象2、初始化数据库3、声明一个上下文4、挂载用户相关的路由&#xff0c;这里主要由 用户登录接口用户注册注册获取用户列表接口激活指定用户接口 5、启动服务 import mcrud import api import amauth import env import contextli…...

c++ memset 指针示例

目录 C 传一个float指针&#xff0c;在函数内部修改指针的值 c memset 指针示例 C 传一个float指针&#xff0c;在函数内部修改指针的值 #include <iostream>// 定义一个函数&#xff0c;它接受一个指向float的指针 void modifyValue(float* ptr) {// 通过解引用指针来…...

24考研双非上岸武汉理工大学电子信息专硕,855考研经验

目录 一、考研择校经验 二、武理考研初试经验 三、武理考研复试经验 一、考研择校经验 我建议学弟学妹们确定院校时没必要一上来就说我一定要考某个院校。其实考哪个学校是要在考研备考的过程中慢慢探索&#xff0c;慢慢研究的&#xff0c;不过最晚9月初一定要确定院校了&a…...

使用KubeKey 快速交付k8s v1.28.8集群

文章目录 服务器配置使用kubekey部署k8s1. 操作系统基础配置2. 安装部署 K8s2.1 下载 KubeKey2.2 创建 K8s 集群部署配置文件 3. 验证 K8s 集群3.1 验证集群状态 4. 部署测试资源5.验证服务 服务器配置 主机名IPCPU内存系统盘数据盘用途vm-16-11-ubuntu192.168.9.131128256Gi5…...

nginx--压缩https证书favicon.iconginx隐藏版本号 去掉nginxopenSSL

压缩功能 简介 Nginx⽀持对指定类型的⽂件进行压缩然后再传输给客户端&#xff0c;而且压缩还可以设置压缩比例&#xff0c;压缩后的文件大小将比源文件显著变小&#xff0c;这样有助于降低出口带宽的利用率&#xff0c;降低企业的IT支出&#xff0c;不过会占用相应的CPU资源…...

通俗的理解网关的概念的用途(四):什么是网关设备?(网络层面)

任何一台Windows XP操作系统之后的个人电脑、Linux操作系统电脑都可以简单的设置&#xff0c;就可以成为一台具备“网关”性质的设备&#xff0c;因为它们都直接内置了其中的实现程序。MacOS有没有就不知道&#xff0c;因为没用过。 简单的理解&#xff0c;就是运行了具备第二…...

Spring JdbcTemplate实现自定义动态sql拼接功能

需求描述&#xff1a; sql 需要能满足支持动态拼接&#xff0c;包含 查询字段、查询表、关联表、查询条件、关联表的查询条件、排序、分组、去重等 实现步骤&#xff1a; 1&#xff0c;创建表及导入测试数据 CREATE TABLE YES_DEV.T11 (ID BINARY_BIGINT NOT NULL,NAME VARCH…...

第十一篇:操作系统新纪元:智能融合、量子跃迁与虚拟现实的交响曲

操作系统新纪元&#xff1a;智能融合、量子跃迁与虚拟现实的交响曲 1 引言 在数字化的浪潮中&#xff0c;操作系统如同一位智慧的舵手&#xff0c;引领着信息技术的航船穿越波涛汹涌的海洋。随着人工智能、物联网、量子计算等前沿技术的蓬勃发展&#xff0c;操作系统正站在一个…...

【大数据】学习笔记

文章目录 [toc]NAT配置IP配置SecureCRT配置PropertiesTerminal Java安装环境变量配置 Hadoop安装修改配置文件hadoop-env.shyarn-env.shslavescore-site.xmlhdfs-site.xmlmapred-site.xmlyarn-site.xml 环境变量配置 IP与主机名映射关系配置hostname配置映射关系配置 关闭防火墙…...

PHP 框架安全:ThinkPHP 序列 漏洞测试.

什么是 ThinkPHP 框架. ThinkPHP 是一个流行的国内 PHP 框架&#xff0c;它提供了一套完整的安全措施来帮助开发者构建安全可靠的 web 应用程序。ThinkPHP 本身不断更新和改进&#xff0c;以应对新的安全威胁和漏洞。 目录&#xff1a; 什么是 ThinkPHP 框架. ThinkPHP 框架…...

厂家自定义 Android Ant编译流程源码分析

0、Ant安装 Windows下安装Ant&#xff1a; ant 官网可下载 http://ant.apache.org ant 环境配置&#xff1a; 解压ant的包到本地目录。 在环境变量中设置ANT_HOME&#xff0c;值为你的安装目录。 把ANT_HOME/bin加到你系统环境的path。 Ubuntu下安装Ant&#xff1a; sudo apt…...

基于springboot+vue+Mysql的体质测试数据分析及可视化设计

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…...

uniapp的app端推送功能,不使用unipush

1&#xff1a;推送功能使用htmlPlus实现&#xff1a;地址HTML5 API Reference (html5plus.org) 效果图&#xff1a; 代码实现&#xff1a; <template><view class"content"><view class"text-area"><button click"createMsg&q…...

数据结构(四)————二叉树和堆(中)

制作不易&#xff0c;三连支持一下呗&#xff01;&#xff01;&#xff01; 文章目录 前言一、堆的概念及结构二、堆的实现三.堆的应用 总结 前言 CSDN 这篇博客介绍了二叉树中的基本概念和存储结构&#xff0c;接下来我们将运用这些结构来实现二叉树 一、堆的概念及结构 1…...

随便写点东西

1 react的高阶组件 1.1 操纵组件的props、对组件的props进行增删&#xff1b; 1.2 复用组件逻辑 服用的组件逻辑&#xff0c;互不影响&#xff1b;比如高阶组件中复用了input框&#xff0c;输入内容是互不影响的&#xff1b; 1.3 可以通过配置装饰器来实现高阶组件&#xff08…...

Mac 报错 Zsh: command not found :brew

Mac 安装其他命令时报错 Zsh: command not found :brew终于找到一个能行的&#xff0c;还能够配置国内下载源&#xff0c;记录一下 执行 /bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)"选择一个开始继续执行即可...