模拟实现消息队列(基于SpringBoot实现)
提要:此处的消息队列是仿照RabbitMQ实现(参数之类的),实现一些基本的操作:创建/销毁交互机(exchangeDeclare,exchangeDelete),队列(queueDeclare,queueDelete),绑定(queueBind,queueUnbind),发送消息(basicPublish),订阅消息(basicConsume),手动确认消息(basicAck),并且只实现了单机系统下的消息队列,下面是具体实现过程。
ps:这里的创建采用的是Declare而不用create,意义在如果没有存在就创建,存在的话就不管。
1.思路
消息队列服务器本体的两个主要核心类:VirtualHost(表示一个虚拟主机,该主机管理着当前主机上的交换机,队列,绑定和为上层api(BrokerServer)提供调用)和BrokerServer(用来接受消费者和生产者的请求,并且根据请求(这里采用自定义的应用层协议)的类型来进行处理请求,并返回响应)。
客户端这边主要的两个核心类是:Connection和Channel,由于客户端(包括消费者和生产者)要想和服务器进行可靠的通信就需要进行一些比较复杂的连接过程,如果每一次连接创建完成后,将请求传输过去,服务器这边接收到了请求,并且通过当前连接返回响应,然后就将连接断开,那么当有很多请求来临的时候,每一个请求都要建立一个连接,这样开销的话会很大,所以这里引入Channel(频道),一个Connection(每一个客户端只需要建立一个连接就可以完成后续的所有请求)中有多个Channel,每次发送一个请求都是通过新建一个Channel,此时不需要进行复杂的可靠连接操作(Connection已经完成这部分的内容),所以此时的效率提高,销毁连接时的开销也减少了。
存储数据的方式是采用内存+硬盘,硬盘上的数据存储是为了消息队列在重启或者掉电的时候,能够恢复到之间的数据状态,在进行上述消息队列功能的时候都是在进行操作内存,这样能降低IO的读取次数,从而提高效率。
2.具体实现
ps:由于是客户端和服务器之间是通过网络进行通信,传输的数据是二进制的,所以需要对类进行序列化和反序列化(也就是实现Serializable接口)
(1)数据在硬盘上存储:
①数据类型:
这里采用的数据库不是MySql而是一个更轻量的数据库SQLite,由于此处只需要进行简单的创建表,存储和查询所有结果的操作,并且Mybaits也是支持的,所以用SQLite的话更适合一点。
在这个消息队列里面可能(可以手动传入参数来判定是否要持久化存储)需要持久化存储的数据有Message(消息,没有使用SQLite存储,后面重点介绍),Exchange(交换机),MSGQueue(消息队列),Binding(绑定)
a.Exchange:
这里为啥会引入交换机?直接将信息发送给队列不就好了吗?
RabbitMQ里面就有这个概念,先将消息发送给交换机,然后通过根据交换机的类型将消息发送队列。交互机不是具体的功能的实现的地点只是进行消息的转发,就相当于公司的前台。
一个交换机可以绑定多个队列,但是一个队列只能绑定一个交换机,这里就涉及到交换机与队列之间的对应关系。同时不同的交换机类型所转发的队列也不同,在交换机这里涉及两个关键的变量:bindingKey(交换机和队列进行绑定的字符串)和routingKey(发送消息到交换机指定的字符串)
交换机这里有实现三种类型:
直接交换机(DIRECT):将routingKey作为队列名称进行转发消息,也就是转发消息给以routingKey为名字的队列,此时bindingKey对直接交换机来说没有作用
扇出交换机(FANOUT):与当前交换机绑定的所有队列都转发消息。
主题交换机(TOPIC):当前bindingKey和routingKey对应上才能进行转发。
交换机里面主要有五种参数:name(相当于每一个交换机的身份标识),type(交换机的类型,可以用一个枚举类来实现),durable(是否持久化存储),autoDelete(是否在不进行使用的时候进行删除),arguments(扩展项,是一个Map<String,Object>)
关于arguments在数据库中的存储:(后续相关类中也会采用该种形式)
在数据库中arguments是采用varchar(变长字符串)的类型存储的,所以在进行存储的要对Map进行转化,转化成json字符串进行存储,而Mybaits调用java对象进行数据存储的时候是会调用对应变量的get方法,所以此时需要借用ObjectMapper类来进行将Map到json字符串的转化。
Exchange类代码实现:
@Data
public class Exchange implements Serializable {//使用name来作为交换机的唯一身份标识private String name;//交换机类型private ExchangeType type = ExchangeType.DIRECT;//是否要持久化存储,true表示需要,false表示不需要private boolean durable = false;//如果没有人用了,当前交换机是否要进行删除,这部分在后续代码中并没有实现(扩展内容)private boolean autoDelete = false;//额外扩展项private Map<String,Object> arguments = new HashMap<>();//使用下面类进行json字符串和java对象的转化private ObjectMapper objectMapper = new ObjectMapper();public String getArguments() {try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}//下面的get和set方法就是在进行Map对象和json字符串的转化public void setArguments(String jsonArguments) {try {arguments = objectMapper.readValue(jsonArguments, new TypeReference<Map<String,Object>>(){});} catch (JsonProcessingException e) {e.printStackTrace();}}public Object getArguments(String key) {return arguments.get(key);}
}
ps:代码中使用 @Data注解是lombok下的一个工具类,会实现当前类中所有变量的get和set方法。
ExchangeType类的具体实现:
public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private ExchangeType(int type) {}
}
b.MSGQueue:
当交换机类型确定后,就根据类型的不同进行转发消息到对应队列中。当前这个类只是存储一个队列名,而消息的具体存储是在硬盘上(文件中)。队列中主要有这几个参数:name(队列名),durable(是否持久化存储),autoDelete(是否开启自动删除),exclusive(是否独占,这个参数与后续消费者消费消息有关系,如果为false,表示当前队列中消息所有队列中的消费者都可以消费,如果为true,表示当前队列中的消息只能有当前队列中的消费者能够消费),arguments(扩展项,也需要进行json字符串转化),Consumer(代表消费者,后续会介绍),ArrayList(用来存放当前队列中的所有消费者),ConsumerSeq(一个atomicInteger变量,在多线程环境下变量也能够进行原子的操作,方便进行轮询,这也是采用顺序表的原因)
MSGQueue类代码实现:(关于消费者相关的在后续会介绍)
@Data
public class MSGQueue implements Serializable {//表示当前队列的唯一身份标识private String name;//是否持久化private boolean durable;//自动删除,队列没有人用是否删除private boolean autoDelete;//是否要独占,如果为true,表示当队列只能一个消费者使用,为false,表示多个消费者都可以使用//这个功能也暂时不实现(RabbitMQ中有的)private boolean exclusive;//由于如果要存进数据库,所以这里要将Map类型进行转化成json字符串,然后存储进去//Mybaits操作数据库,是调用java对象的getter方法,然后赋给对应字段//表示扩展的一些参数,先列在这里,后续如果要用,可以随时添加(RabbitMQ中有的)private Map<String,Object> arguments = new HashMap<>();//当前队列中管理哪些消费者private List<ConsumerEnv> consumerEnvList = new ArrayList<>();//当前取到了第几个消费者,方便进行轮策略(考虑到线程安全问题,可以直接使用atomic原子类)private AtomicInteger consumerSeq = new AtomicInteger(0);//使用下面类进行json字符串和java对象的转化private ObjectMapper objectMapper = new ObjectMapper();public String getArguments() {try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public void setArguments(String jsonArguments) {try {objectMapper.readValue(jsonArguments, new TypeReference<Map<String,Object>>() {});} catch (JsonProcessingException e) {throw new RuntimeException(e);}}//添加消费者,是在消费者订阅消息时所添加的public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}//通过轮询的方式选出一个消费者进行消费消息public ConsumerEnv chooseConsumerEnvExclusive() {if(consumerEnvList.size() == 0) {//当前队列没人订阅return null;}int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.incrementAndGet();return consumerEnvList.get(index);}
}
c.Binding:
表示当前交换机绑定了哪些队列,主要有三个参数:exchangeName,queueName,bindingKey
关于Binding是否进行持久化存储,关键看当前对应的交换机和队列是否进行了持久化存储,如果其中有一个没有进行持久化存储,那么此时对Binding进行持久化存储是没有意义的(所以在进行代码编写的时候对于创建绑定这部分关于是否要进行持久化存储需要进行一定的判断)
Binding类代码实现:
@Data
public class Binding {//交换机名private String exchangeName;//队列名private String queueName;//绑定字符串private String bindingKey;
}
d.Message:
用当前类来表示一个消息,主要有下列几个参数:BasicProperties(每一个消息都拥有的基本属性:messageId(可以通过UUID类来生成唯一的id),routingKey,deliveryMode(1表示不持久化,2表示持久化)),body(消息的本体,是一个byte数组),offsetBeg和offsetEnd(由于当前传输的数据是二进制数据,容易产生粘包问题,所以需要引入两个偏移量来表示,当前消息从哪到哪表示一个完整的消息),isValid(表示当前消息是否有效,这里消息的删除是采用的逻辑删除的方式,并不是真正的删除,只是后续在进行垃圾回收(后续会介绍)的时候,才会回收这些被逻辑删除的消息。0x1表示有效,0x0表示无效)
在该类中还提供了一个生成消息(createMessageWithId)的工厂方法(里面messageId就是通过UUID生成的),它是一个静态方法。
Message类具体实现:
@Data
public class Message implements Serializable {//这两个属性是Message的核心属性private BasicProperties basicProperties;//这里使用byte类型的正文,是为了能够序列化和反序列化private byte[] body;//下面是辅助的属性//Message后续也会持久化存储,所以需要找到对应的正文部分//由于是使用字节来传输数据,所以需要知道起始偏移量和结束偏移量才能知道从哪到哪是一个完整的正文//使用下列两个偏移量来进行表示:[offsetBeg,offsetEnd)private long offsetBeg = 0;private long offsetEnd = 0;//使用这个属性表示该消息在文件中是否为有效信息(这里是使用逻辑删除的方式来进行删除消息)//0x1表示存在,0x0表示不存在private byte isValid = 0x1;//这里创建一个工厂方法(不使用构造方法封装对象了),可以直接通过调用该方法,来直接获得已经封装好的message对象//使用UUID生成唯一idpublic static Message createMessageWithId(BasicProperties basicProperties,byte[] body) {Message message = new Message();if(basicProperties != null) {//以T-作为前缀basicProperties.setMessageId("T-" + UUID.randomUUID());message.setBasicProperties(basicProperties);}if(body != null) {message.setBody(body);}return message;}
}
②具体存储形式:
Exchange,MSGQueue,Binding是放在SQLite数据库中,而Message是放在对应文件中的。这里有几个关键的类:DataBaseManage,MessageFileManage,DiskDataManage
a.DataBaseManage
SQLite数据库上的相关操作的封装。由于此次操作数据使用的是Mybaits,所以操作数据库上数据是放在MetaMapper(方法调用)和MetaMapper.xml(具体实现,且当前xml文件的位置需要再application.yml中进行配置)上的。
MetaMapper代码具体实现:
@Mapper
public interface MetaMapper {//持久化存储:表的创建void createTableExchange();void createTableBinding();void createTableQueue();//针对上述三个表,进行插入和删除void insertExchange(Exchange exchange);void deleteExchange(String exchangeName);List<Exchange> selectExchanges();void insertBinding(Binding binding);void deleteBinding(Binding binding);List<Binding> selectBindings();void insertQueue(MSGQueue queue);void deleteQueue(String QueueName);List<MSGQueue> selectQueues();
}
ps:需要引入Mybaits的依赖,并且在相关类上实现@Mapper注解。
MetaMapper.xml具体实现:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mq.mqserver.mapper.MetaMapper"><update id="createTableExchange">create table if not exists exchange(name varchar(50) primary key,type varchar(20),durable boolean,autoDelete boolean,arguments varchar(1024));</update><update id="createTableBinding">create table if not exists binding(exchangeName varchar(50),queueName varchar(50),bindingKey varchar(1024));</update><update id="createTableQueue">create table if not exists queue(name varchar(50) primary key,durable boolean,autoDelete boolean,exclusive boolean,arguments varchar(1024));</update><insert id="insertExchange" parameterType="com.example.mq.mqserver.core.Exchange">insert into exchange values (#{name},#{type},#{durable},#{autoDelete},# {arguments});</insert><delete id="deleteExchange">delete from exchange where name = #{exchangeName};</delete><select id="selectExchanges" resultType="com.example.mq.mqserver.core.Exchange">select * from exchange;</select><insert id="insertBinding" parameterType="com.example.mq.mqserver.core.Binding">insert into binding values(#{exchangeName},#{queueName},#{bindingKey});</insert><delete id="deleteBinding">delete from binding where queueName = #{queueName}and exchangeName = #{exchangeName}and bindingKey = #{bindingKey};</delete><select id="selectBindings" resultType="com.example.mq.mqserver.core.Binding">select * from binding;</select><insert id="insertQueue" parameterType="com.example.mq.mqserver.core.MSGQueue">insert into queue values (#{name},#{durable},#{autoDelete},#{exclusive},# {arguments});</insert><delete id="deleteQueue" >delete from queue where name = #{QueueName};</delete><select id="selectQueues" resultType="com.example.mq.mqserver.core.MSGQueue">select * from queue;</select>
</mapper>
ps:创建表的操作是放在<update>标签中实现的。 其他的标签和使用MySQL的时候相同。
在DataManage类中,不仅对上述操作数据库的操作进行了封装,还对数据库文件进行了创建(具体的方法是init,通过该方法完成数据库的创建,数据库中表的创建,和数据库中默认数据的添加(主要是exchange的添加,仿照RabbitMQ来的))。
DataBaseManage类代码具体实现:
@Slf4j
public class DataBaseManage {private MetaMapper metaMapper;//创建数据库和表public void init() {metaMapper = MqApplication.context.getBean(MetaMapper.class);//判断数据库是否存在if(!isExistsDataBase()) {File file = new File("./data/meta.db");boolean ret = file.mkdirs();if(ret) {log.info("创建数据库成功");}else {log.info("创建数据库失败");}//创建表createTable();//添加默认数据createDefaultData();log.info("数据库初始化已完成");}else {log.info("数据库已存在");}}public void deleteDB() {//删除数据库中的文件File file = new File("./data/meta.db");log.info(file.getAbsolutePath());boolean ret = file.delete();if (ret) {log.info("删除数据库中文件成功");}else {log.info("删除数据库中文件失败");}//删除目录file = new File("./data");log.info(file.getAbsolutePath());ret = file.delete();if(ret) {log.info("删除目录成功");}else {log.info("删除目录失败");}}//此处主要是添加一个默认的交换机//构造一个DIRECT类型的交换机//在RabbitMQ中有一个设定:带有一个匿名的交换机,类型是DIRECTprivate void createDefaultData() {Exchange exchange = new Exchange();exchange.setName("");exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);log.info("插入exchange成功");}private void createTable() {metaMapper.createTableQueue();metaMapper.createTableBinding();metaMapper.createTableExchange();log.info("[createTable] 创建表完成");}private boolean isExistsDataBase() {File file = new File("./data/meta.db");return file.exists();}public void insertExchange(Exchange exchange) {metaMapper.insertExchange(exchange);}public void deleteExchange(String exchangeName) {metaMapper.deleteExchange(exchangeName);}public List<Exchange> selectExchanges() {return metaMapper.selectExchanges();}public void insertBinding(Binding binding) {metaMapper.insertBinding(binding);}public void deleteBinding(Binding binding) {metaMapper.deleteBinding(binding);}public List<Binding> selectBindings() {return metaMapper.selectBindings();}public void insertQueue(MSGQueue msgQueue) {metaMapper.insertQueue(msgQueue);}public void deleteQueue(String queueName) {metaMapper.deleteQueue(queueName);}public List<MSGQueue> selectQueues() {return metaMapper.selectQueues();}
}
ps:@Slf4j是日志打印注解,也是lombok工具类中的一个注解。
b .MessageFileManage
每个队列中的消息的写入,取出,删除,垃圾回收都是通过这个类来完成的。在这个类中消息的二进制形式的,所以在消息的写入和读出都需要用到Stream相关类。主要有InputStream/OutputStream,DataInputStream/DataOutputStream(继承于InputStream/OutputStream,让读和写的方法更多,不需要关注过多的数据类型转换),RandomAccessFile(可以在任意位置修改文件中的数据)
该类需要维护的文件有两个:queue_data.txt(message真实存储的地方,二进制的形式存储),queue_stat.txt(当前queue_data文件中总消息个数和有效消息个数,以便后续进行垃圾回收,形如:2000\t1000,2000:总消息个数,1000有效消息个数)
MessageFileManage代码具体实现:
@Slf4j
public class MessageFileManage {public void init() {}//维护一个静态内部类Stat:表示文件之中总数和有效数public static class Stat {private int totalCount;private int validCount;public int getTotalCount() {return totalCount;}public void setTotalCount(int totalCount) {this.totalCount = totalCount;}public int getValidCount() {return validCount;}public void setValidCount(int validCount) {this.validCount = validCount;}}//获取消息队列的文件路径private String getQueueDir(String queueName) {return "./data/" + queueName;}//获取消息队列中消息的文件路径private String getQueueDataDir(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}//获取消息中数据的文件路径//由于是一个二进制文件,采用删除文件中消息的方式是逻辑删除(并不是真正的删除)// 所以需要单独维护一个文件记录当前文件总消息数和有效消息数,以便在总消息数达到多少(拍脑门拍出来的),// 且有效消息数占到多少是触发回收(这里是复制算法,单独搞一个文件,把有效消息拷贝过去,然后直接删除掉之前文件中的所有消息)//上述垃圾回收的过程是比较消耗cpu资源的,所以不能频繁发生(具体情况具体分析)。//约定文件中内容格式为:总数\t有效数private String getQueueStatDir(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}//读取queue_data.txt文件中数据,并且赋值给statprivate Stat readStat(String queueName) {Stat stat = new Stat();//直接使用Scanner进行读取就好try(InputStream inputStream = new FileInputStream(getQueueStatDir(queueName))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {throw new RuntimeException(e);}}//向queue_data.txt文件写入数据(stat)private void writeStat(String queueName,Stat stat) {try(OutputStream outputStream = new FileOutputStream(getQueueStatDir(queueName))) {PrintWriter writer = new PrintWriter(outputStream);writer.write(stat.totalCount + "\t" + stat.validCount);writer.flush();} catch (IOException e) {throw new RuntimeException(e);}}//创建队列目录和文件public void createQueueFiles(String queueName) throws IOException {//1.创建队列File queueDir = new File(getQueueDir(queueName));if(!queueDir.exists()) {//还有个上级目录:data目录,所以用mkdirsboolean ok = queueDir.mkdirs();if(!ok) {log.warn("创建文件失败");throw new IOException("[createQueueFiles] 创建队列失败,queueDir:" + queueDir.getAbsolutePath());}}//2.创建消息文件File queueDataDir = new File(getQueueDataDir(queueName));if(!queueDataDir.exists()) {boolean ok = queueDataDir.createNewFile();if(!ok) {log.warn("创建文件失败");throw new IOException("[createQueueFiles] 创建消息文件失败,queueDataDir:" + queueDataDir.getAbsolutePath());}}//3.创建消息统计文件File queueStatDir = new File(getQueueStatDir(queueName));if(!queueStatDir.exists()) {boolean ok = queueStatDir.createNewFile();if(!ok) {log.warn("创建文件失败");throw new IOException("[createQueueFiles] 创建消息统计文件失败, queueStatDir:" + queueStatDir.getAbsolutePath());}}//4.给消息统计文件赋初始值Stat stat = new Stat();stat.validCount = 0;stat.totalCount = 0;writeStat(queueName,stat);log.info("创建文件成功");}//删除队列对应目录和文件//如果队列删除了,那么队列中的文件都会被删除public void deleteQueue(String queueName) throws IOException {//先删除里面文件File queueDataFile = new File(getQueueDataDir(queueName));if(queueDataFile.exists()) {boolean ok = queueDataFile.delete();if(!ok) {log.warn("删除文件失败");throw new IOException("删除文件失败");}}File queueStatFile = new File(getQueueStatDir(queueName));if(queueStatFile.exists()) {boolean ok = queueStatFile.delete();if(!ok) {log.warn("删除文件失败");throw new IOException("删除文件失败");}}File queueDir = new File(getQueueDir(queueName));if(queueDir.exists()) {boolean ok = queueDir.delete();if(!ok) {log.warn("删除文件失败");throw new IOException("删除文件失败");}}log.info("删除文件成功");}//检查当前队列目录和文件是否存在private boolean checkFileExist(String queueName) {File queueDir = new File(getQueueDir(queueName));if(!queueDir.exists()) {return false;}File queueDataDir = new File(getQueueDataDir(queueName));if(!queueDataDir.exists()) {return false;}File queueStatDir = new File(getQueueStatDir(queueName));if(!queueStatDir.exists()) {return false;}return true;}//这个方法是将message写入queue中public void writeMessage(MSGQueue queue, Message message) throws IOException {//1.检查当前队列目录和文件是否存在if(!checkFileExist(queue.getName())) {throw new IOException("队列或队列中文件不存在,queueName:" + queue.getName());}//2.将message序列化成二进制数据byte[] messageBinary = BinaryTool.toBytes(message);//对当前队列进行加锁,防止在多线程环境下出现线程安全问题synchronized(queue) {//3.设置message中的offsetBeg和offsetEndFile queueDataFile = new File(getQueueDataDir(queue.getName()));message.setOffsetBeg(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + messageBinary.length + 4);messageBinary = BinaryTool.toBytes(message);try(OutputStream outputStream = new FileOutputStream(getQueueDataDir(queue.getName()),true)) {try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {//约定以下面格式进行写入//先写入当前bytes的长度//再写入具体的bytes(二进制数据)dataOutputStream.writeInt(messageBinary.length);dataOutputStream.write(messageBinary);}}//4.更新统计文件中的数据Stat stat = readStat(queue.getName());//可能出现线程安全问题stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(),stat);}}//只是进行逻辑删除,触发垃圾回收时,才真正进行删除public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException {if (!checkFileExist(queue.getName())) {throw new IOException("队列或队列中文件不存在,queueName:" + queue.getName());}synchronized (queue) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataDir(queue.getName()), "rw")) {//1.先从文件中把对应message的信息读取出来//构造一个bytes数组,长度为message的lengthbyte[] bytes = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];//移动光标到offsetBeg为止randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bytes);//2.反序列化成java对象Message message1 = (Message) BinaryTool.fromBytes(bytes);message1.setIsValid((byte) 0x0);//3.在序列化成二进制数据写入文件中//注意光标的位置randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(BinaryTool.toBytes(message1));}//可以看到上述一顿操作也只是改变了文件中的一个字节//4.更新统计文件中的数据Stat stat = readStat(queue.getName());if (stat.validCount > 0) {stat.validCount -= 1;}writeStat(queue.getName(), stat);}}//加载队列中的所有消息public LinkedList<Message> loadAllMessagesFromQueue(String queueName) throws ClassNotFoundException, IOException {LinkedList<Message> messages = new LinkedList<>();try(InputStream inputStream = new FileInputStream(getQueueDataDir(queueName))){try(DataInputStream dataInputStream = new DataInputStream(inputStream)) {//通过下面一个while循环就可以将文件读完//到文件末尾会抛出EOFException异常//可以通过捕获该异常来表示已经读取到文件末尾long currentOffset = 0;while(true) {//1.读取message的长度int messageSize = dataInputStream.readInt();//2.读取message的内容byte[] buffer = new byte[messageSize];if(buffer.length != messageSize) {throw new MqException("[MessageLoad]文件读取有误,queueName:" + queueName);}dataInputStream.read(buffer);//3.反序列化成message对象Message message = (Message) BinaryTool.fromBytes(buffer);//判断是否为有效数据if(message.getIsValid() == 0x0) {currentOffset += (4 + messageSize);continue;}//4.设置当前message的偏移量message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {//如果发生了该异常,说明文件已经读取完成//只需要捕获一下当前异常,然后说明一下读取完成log.info("文件读取完成");}}return messages;}//检查当前是否要进行GC//回收规则是:当总数达到2000,且有效数据个数超过总数的一半时,就触发GC回收private boolean checkGC(String queueName) {Stat stat = readStat(queueName);if(stat.totalCount > 2000 && stat.validCount * 2 > stat.totalCount) {return true;}return false;}//新文件位置private String getNewQueueDataDir(String queueName) {return getQueueDir(queueName) + "/queue_data_new.txt";}//GC回收//思路:创建一个新文件(queue_data_new.txt),然后把旧文件中的有效数据拷贝到新文件中// 然后直接删除掉旧文件,然后将原来旧文件的名称给新文件(queue_data.txt)public void gc(MSGQueue queue) throws IOException, ClassNotFoundException {//线程安全问题需要注意synchronized (queue) {//由于gc是比较耗时的操作,所以可以统计一哈消耗的时间long begTime = System.currentTimeMillis();//1.准备好新文件File newFile = new File(getNewQueueDataDir(queue.getName()));if (newFile.exists()) {throw new MqException("gc过程中数据没有删除干净");}boolean ok = newFile.createNewFile();if (!ok) {throw new MqException("新文件创建失败");}//2.将文件里的有效数据个数拷贝到新文件中List<Message> messages = loadAllMessagesFromQueue(queue.getName());try (OutputStream outputStream = new FileOutputStream(getNewQueueDataDir(queue.getName()),true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {for (Message message : messages) {if (message.getIsValid() == 0x1) {byte[] buffer = BinaryTool.toBytes(message);dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}}//3.删除旧文件queue_data.txtFile oldFile = new File(getQueueDataDir(queue.getName()));if (oldFile.exists()) {boolean ok1 = oldFile.delete();if (!ok1) {throw new MqException("删除queue_data.txt文件失败");}}//4.将新文件重新命名为queue_data.txtboolean ok2 = newFile.renameTo(oldFile);if (!ok2) {throw new MqException("重命名queue_data_new.txt文件为queue_data.txt失败");}//5.更新queue_stat.txt文件中的信息Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(), stat);long endTime = System.currentTimeMillis();log.info("gc消耗的时间:" + (endTime - begTime));}}
}
关于Stat类:
是一个静态内部类,用来维护总消息个数(totalCount)和有效消息个数(validCount),队列中有消息写入来totalCount和validCount都会+1(初始化放在了创建队列相关文件方法里面);有消息删除时,validCount会-1;这里约定当toalCount>2000且validCount * 2>totalCount时候,才进行垃圾回收。这里变量的加减操作可能会出现线程安全问题,所以在相关步骤实现了加锁。
关于queue_stat文件的写和读:
主要是readStat和writeStat方法,就是正常的文件读写操作,需要注意两个变量之间以\t作为分隔符,方便后续的数据读取(使用的Scanner中nextInt方法)
关于序列化和反序列化:
在这里封装成了一个类(BinaryTool),里面有两个静态方法:toBytes(序列化,使用ObjectOutputStream),fromBytes(反序列化,使用ObjectInputStream)
具体实现:
public class BinaryTool {//序列化public static byte[] toBytes(Object o) throws IOException {//由于传入的参数是一个变长的对象,所以使用ByteArrayOutputStream流对象就相当于一个变长的数组//就可以把object序列化的二进制数据写入ByteArrayOutputStream,最终在统一转化成byte数组try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {try(ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {//这个方法就是将object对象序列化,并写入到ObjectOutputStream中//由于ObjectOutputStream关联到ByteArrayOutputStream,所以最终序列化后的数据会写入到ByteArrayOutputStream中objectOutputStream.writeObject(o);}return byteArrayOutputStream.toByteArray();}}//反序列化public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {// 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.object = objectInputStream.readObject();}}return object;}
}
关于写入信息:
也就是writeMessage方法,需要注意的是offsetBeg和offsetEnd的设置和打开OutputStream中拼接参数的选项。

序列化是整个message,注意在设置完offsetBeg和offsetEnd之后,后续要在针对当前message进行一次序列化,防止写入的message中这两个变量的值为0。还有不要忘了对queue_stat.txt文件的更新。
关于删除信息:
也就是deleteMessage方法,需要注意的是RandomAccessFile(可读可写)类中光标的移动,所以就需要维护一个变量来存储修改前光标的位置,先将对应位置的数据读取出来(通过两个偏移量,还涉及到反序列化),然后在进行数据修改,完成之后,还需要将光标移动回修改前的位置,最后在将修改完成的数据重新写入刚才的位置(涉及到序列化)。这个过程也只是修改message属性中一个值isValid(将其修改成0x0,无效)
关于加载所有信息:
需要维护一个currentOffset变量,来记录当前数据读取到哪了。在读取过程中使用while(true)死循环,直到抛出EOFException异常,代表读取到文件末尾,只需捕获该异常,就可以停止文件地读取。currentOffset每次是进行加messageSize+4。

关于垃圾回收(gc):
这是采用复制算法,也就是当触发gc回收(总消息个数>2000且有效消息个数 * 2>总消息个数)的时候将有效消息全部拷贝到一个新的文件(queue_new_data.txt)中,然后删除刚才的queue_data.txt文件,最后在将新文件(queue_new_data.txt)命名为(queue_data.txt)。
c.DiskDataManage
该类做的主要工作是对上述两个进行了进一步的封装,为上层提供api的调用。
代码具体实现:
public class DiskDataManage {//使用这个类来操作数据库中数据private DataBaseManage dataBaseManage = new DataBaseManage();//使用这个类来操作文件中的相关数据private MessageFileManage messageFileManage = new MessageFileManage();public void init() {dataBaseManage.init();//此处的init是空方法,等后续需要用的时候,直接在这初始化就行messageFileManage.init();}public void deleteDB() {dataBaseManage.deleteDB();}//封装交换机相关操作public void insertExchange(Exchange exchange) {dataBaseManage.insertExchange(exchange);}public void deleteExchange(String exchangeName) {dataBaseManage.deleteExchange(exchangeName);}public List<Exchange> selectExchanges() {return dataBaseManage.selectExchanges();}//封装队列相关操作public void insertQueue(MSGQueue queue) {dataBaseManage.insertQueue(queue);}public void deleteQueue(String queueName) {dataBaseManage.deleteQueue(queueName);}public List<MSGQueue> selectQueues() {return dataBaseManage.selectQueues();}//封装Binding相关操作public void insertBinding(Binding binding) {dataBaseManage.insertBinding(binding);}public void deleteBinding(Binding binding) {dataBaseManage.deleteBinding(binding);}public List<Binding> selectBindings() {return dataBaseManage.selectBindings();}//封装文件相关操作public void createQueueFiles(String queueName) throws IOException {messageFileManage.createQueueFiles(queueName);}public void deleteQueueFiles(String queueName) throws IOException {messageFileManage.deleteQueue(queueName);}public void writeMessage(MSGQueue queue, Message message) throws IOException {messageFileManage.writeMessage(queue,message);}public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {messageFileManage.deleteMessage(queue,message);}public LinkedList<Message> loadMessages(String queueName) throws IOException, ClassNotFoundException {return messageFileManage.loadAllMessagesFromQueue(queueName);}public void gc(MSGQueue queue) throws IOException, ClassNotFoundException {messageFileManage.gc(queue);}}
(2)数据在内存中存储:
主要是MemoryDataManage类,里面有主要有六个Map和若干方法。考虑到线程安全问题Map是直接采用ConcurrentHashMap。
存储所有信息的Map:
ConcurrentHashMap<String,Message> messageMap(key-messageId,value-message)
存储所有交换机的Map:
ConcurrentHashMap<String,Exchange> exchangeMap(key-exchangeName,value-Exchange)
存储所有队列的Map:
ConcurrentHashMap<String,MSGQueue> queueMap(key-queueName,value-MSGQueue)
存储所有绑定关系的Map:
ConcurrentHashMap<String,ConcurrentHashMap<String,Binding>> bindingMap(key-exchangeName,value-ConcurrentHashMap(field-queueName,value-Binding))
存储每一个队列中的信息的Map:
ConcurrentHashMap<String,LinkedList<Message>> queueMessageMap(key-queueName,value-LinkedList<Message>)
存储所有未被确认的消息:
ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> messageWithAckMap(key-queueName,value-ConcurrentHashMap(field-messageId,value-Message))
代码的具体实现:
@Slf4j
public class MemoryDataManage {//考虑到线程安全问题,直接使用ConcurrentHashMap即可private DiskDataManage diskDataManage = new DiskDataManage();//缓存exchange:key-exchangeName,value-Exchangeprivate ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();//缓存queue:key-queueName,value-MSGQueueprivate ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();//缓存binding:key-exchangeName,value-HashMap(key:queueName,value:Binding)private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();//缓存message:key-messageId(字符串类型),value-Messageprivate ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();//缓存队列与消息之间的关系:key-queueName,value-LinkedList(Message)private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();//缓存当前未被确认的消息(被消费者取走,但还没有应答的消息):key-queueName,value-ConcurrentHashMap<messageId(字符串类型),Message>private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();public void insertExchange(Exchange exchange) {log.info("插入exchange成功");exchangeMap.put(exchange.getName(),exchange);}public void deleteExchange(String exchangeName) {log.info("删除exchange成功");exchangeMap.remove(exchangeName);}public Exchange getExchange(String exchangeName) {log.info("根据exchangeName获取exchange成功");return exchangeMap.get(exchangeName);}public void insertQueue(MSGQueue queue) {queueMap.put(queue.getName(),queue);log.info("插入queue成功");}public void deleteQueue(String queueName) {queueMap.remove(queueName);log.info("删除queue成功");}public MSGQueue getQueue(String queueName) {log.info("根据queueName获取queue成功");return queueMap.get(queueName);}public void insertBinding(Binding binding) {//根据exchangeName查询bindingMap如果不存在就创建,否则不管//ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());//if(bindingMap == null) {//bindingMap = new ConcurrentHashMap<>();//bindingsMap.put(binding.getExchangeName(),bindingMap);//}//下面代码就是上述代码效果ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), t -> new ConcurrentHashMap<>());synchronized (bindingMap) {if (bindingMap.get(binding.getQueueName()) != null) {throw new MqException("当前队列已经被绑定,exchangeName:" + binding.getExchangeName() + " queueName:" + binding.getQueueName());}bindingMap.put(binding.getQueueName(), binding);log.info("绑定成功,exchangeName:" + binding.getExchangeName() + " queueName:" + binding.getQueueName());}}public void deleteBinding(Binding binding) {ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if(bindingMap == null) {throw new MqException("绑定不存在,exchangeName:" + binding.getExchangeName() + " queueName:" + binding.getQueueName());}bindingMap.remove(binding.getQueueName());log.info("删除binding成功");}//这里写两个版本://根据exchangeName和queueName获取唯一的Binding//根据exchangeName获取所有的Bindingpublic Binding getBinding(String queueName,String exchangeName) {ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(exchangeName);if(bindingMap == null) {return null;}return bindingMap.get(queueName);}public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {return bindingsMap.get(exchangeName);}public void addMessage(Message message) {messageMap.put(message.getBasicProperties().getMessageId(),message);log.info("插入message成功");}public void removeMessage(String messageId) {messageMap.remove(messageId);log.info("删除message成功");}public Message getMessage(String messageId) {log.info("根据messageId获取message成功");return messageMap.get(messageId);}//发送消息到指定队列public void sendMessage(String queueName, Message message) {LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queueName,t -> new LinkedList<>());synchronized (messages) {messages.add(message);}addMessage(message);log.info("发送消息到指定队列成功");}//从指定队列中获取消息public Message peekMessage(String queueName) {LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages == null || messages.size() == 0) {log.info("未找到对应队列");return null;}//采用头删的方式获取消息synchronized (messages) {log.info("获取队列中消息成功");return messages.getFirst();}}public Message pollMessage(String queueName) {LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages == null || messages.size() == 0) {log.info("未找到对应队列");return null;}//采用头删的方式获取消息synchronized (messages) {log.info("获取队列中消息成功");return messages.pollFirst();}}//获取指定消息队列中的消息个数public int getCounts(String queueName) {LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages == null || messages.size() == 0) {log.info("当前队列中消息个数为0");return 0;}synchronized (messages) {log.info("获取指定队列中消息个数成功");return messages.size();}}//添加未确认的消息(已经被取走了,但消费者还没有做出确认,确认确实已经进行了处理该消息)public void addMessageWaitAck(String queueName,Message message) {ConcurrentHashMap<String,Message> messages = queueMessageWaitAckMap.computeIfAbsent(queueName,t -> new ConcurrentHashMap<>());synchronized (messages) {messages.put(message.getBasicProperties().getMessageId(), message);log.info("添加未确认的消息成功");}}//删除未确认的消息(消费者这边已经确认了)public void removeMessageWaitAck(String queueName,String messageId) {ConcurrentHashMap<String, Message> messages = queueMessageWaitAckMap.get(queueName);if (messages == null && messages.size() == 0) {return;}synchronized (messages) {messages.remove(messageId);log.info("删除未确认的消息成功");}}//获取指定的未确认的消息public Message getMessageWaitAck(String queueName,Message message) {ConcurrentHashMap<String, Message> messages = queueMessageWaitAckMap.get(queueName);if (messages == null && messages.size() == 0) {log.info("获取指定队列未确认消息失败");return null;}log.info("获取指定队列未确认消息成功");return messages.get(message.getBasicProperties().getMessageId());}//加载data文件中的数据到内存中(重启后应该做的工作)public void recovery(DiskDataManage diskDataManage) throws IOException, ClassNotFoundException {//1.删除内存中的所有数据messageMap.clear();exchangeMap.clear();queueMap.clear();bindingsMap.clear();queueMessageMap.clear();//2.恢复queueMap中数据List<MSGQueue> queues = diskDataManage.selectQueues();for(MSGQueue queue : queues) {queueMap.put(queue.getName(),queue);}log.info("queueMap中数据恢复完成");//3.恢复messageMap和queueMessageMap中的数据for(MSGQueue queue : queues) {LinkedList<Message> messages = diskDataManage.loadMessages(queue.getName());queueMessageMap.put(queue.getName(), messages);for(Message message : messages) {messageMap.put(message.getBasicProperties().getMessageId(),message);}}//4.恢复exchangeMap中的数据List<Exchange> exchanges = diskDataManage.selectExchanges();for(Exchange exchange : exchanges) {exchangeMap.put(exchange.getName(),exchange);}//5.恢复bindsingMap中的数据List<Binding> bindings = diskDataManage.selectBindings();for(Binding binding : bindings) {ConcurrentHashMap<String,Binding> bind = new ConcurrentHashMap<>();bind.put(binding.getQueueName(),binding);bindingsMap.put(binding.getExchangeName(),bind);}log.info("bindsingMap中数据恢复完成");//注意:"未被确认的消息"不需要从硬盘上恢复// 在等待ack的过程中(服务器),一旦当服务器重启了过后这部分数据就会变成"未取走的消息"//未取走的消息就是在硬盘上存储的,此时消费者需要重新从对列中取}
}
上述类中的方法主要是对对应map的增删查改(在这个过程需要注意线程安全问题,主要是针对未使用线程安全的集合类来说,比如ArrayList,LinkedList)。
关于发送消息到指定队列:
不仅需要再messageMap中添加,还需要再queueMessageMap中添加。
关于未确认消息的集合:
消费者如果在对某个消息进行消费过后(相当于第一次确认消息已经被消费了),是需要进行消息确认(进行二次确认消息已经被消费了),这个消息确认分为自动确认(消息一经第一次消费过后,数据就会进行删除)和手动确认(手动调用最开始说的basicAck方法),在进行消息确认之后才会进行内存上相关信息数据的删除,否则不进行删除。这个消息确认是通过一个变量(autoAck)来进行的,true:自动确认,false:手动确认
关于内存中数据的恢复:
也就是recovery方法,通过这个方法可以在服务器进行重启的时候,进行内存中数据的恢复,所以该方法应该是放在最初初始化的时候执行。需要注意的是未确认消息的集合不需要进行恢复,如果已经进行了消费但是还没有进行确认的话,此时正好服务器又重启了,就可能会导致重复消费的问题,这个问题是消费者这边所考虑的问题。
(3)两个核心类:
VirtualHost和BrokerServer(消息队列服务器本体)
①VirtualHost
数据库和表的初始化操作放在了VirtualHost的构造方法中,在该方法中还需要调用recovery方法,对内存中数据的恢复。为了不同虚拟主机中能使用的队列/交换机相同,所以在下面所有方法中对queueName和exchangeName进行了处理(在对应的交换机名和队列名前面加上当前的主机名)
然后就是为上层提供九个api:exchangeDeclare,exchangeDelete,queueDeclare,queueDelete,queueBind,queueUnbind,basicPublish,basicConsume,basicAck
ps:在这个类中还有两个锁对象:queueLocker和exchangeLocker(考虑到多线程环境下操作可能会出问题,所以会对相关操作加锁)
a.exchangeDeclare(创建交换机):
在创建之前会先查看内存中是否已经存在所需创建的交换机,需要传入的参数:exchangeName(String,交换机名称),type(ExchangeType,交换机类型),durable(boolean,是否持久化),autoDelete(boolean,是否自动删除,未实现),arguments(Map<String,Object>,扩展项)
代码具体实现:
public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable, boolean autoDelete,Map<String,Object> arguments) {//区分不同虚拟主机上的交换机exchangeName = VirtualHostName + exchangeName;synchronized (exchangeLocker) {try {//1.判断对应交换机(内存上)是否存在Exchange toAdd = memoryDataManage.getExchange(exchangeName);if (toAdd != null) {throw new MqException("[VirtualHost] 当前需要创建的交换机已经存在");}//2.如果不存在就进行创建Exchange newExchange = new Exchange();newExchange.setArguments(arguments);newExchange.setType(type);newExchange.setDurable(durable);newExchange.setName(exchangeName);newExchange.setAutoDelete(autoDelete);//3.写入硬盘(持久化)if (durable) {diskDataManage.insertExchange(newExchange);}//4.写入内存memoryDataManage.insertExchange(newExchange);log.info("[VirtualHost] 交换机创建成功");return true;}catch (Exception e) {//一旦上述过程发生异常,就说明交换机创建失败log.info("[VirtualHost] 交换机创建失败");e.printStackTrace();return false;}}
}
ps:上述的参数是仿照这RabbitMQ来搞的(包括下面的所有方法的参数)。
b.exchangeDelete(销毁交换机):
只需要传入一个参数:exchangeName(String),根据exchangeName去查看内存中是否存在,如果存在,则根据查出来的exchange,看是否进行了持久化存储,如果是持久化存储的,则进行删除,否则则只需要删除内存中的exchange即可(也就是存储在exchangeMap中的exchange)
代码具体实现:
public boolean exchangeDelete(String exchangeName) {exchangeName = VirtualHostName + exchangeName;try {synchronized (exchangeLocker) {//1.首先判断对应交换机是否存在Exchange toDelete = memoryDataManage.getExchange(exchangeName);if (toDelete == null) {throw new MqException("[VirtualHost] 当前交换机不存在");}//2.进行硬盘上的删除if(toDelete.isDurable()) {diskDataManage.deleteExchange(exchangeName);}//3.进行内存删除memoryDataManage.deleteExchange(exchangeName);log.info("[VirtualHost] 删除交换机成功");return true;}}catch (Exception e) {e.printStackTrace();log.info("[VirtualHost] 删除交换机失败");return false;}
}
c.queueDeclare(创建队列):
需要传入的参数是queueName(String,队列名称),durable(boolean,是否持久化),autoDelete(boolean,是否自动删除,未实现),exclusive(boolean,是否独占),arguments(Map<String,Object>,扩展项)
代码具体实现:
public boolean queueDeclare(String queueName,boolean durable,boolean autoDelete,boolean exclusive,Map<String,Object> argument) {queueName = VirtualHostName + queueName;try {synchronized (queueLocker) {//首先创建queue相关文件diskDataManage.createQueueFiles(queueName);//1.判断当前队列是否存在MSGQueue queue = memoryDataManage.getQueue(queueName);if (queue != null) {throw new MqException("[VirtualHost] 当前队列已经存在");}//2.创建新的队列MSGQueue toAdd = new MSGQueue();toAdd.setExclusive(exclusive);toAdd.setArguments(argument);toAdd.setName(queueName);toAdd.setDurable(autoDelete);toAdd.setDurable(durable);//3.存储在硬盘中if (durable) {diskDataManage.insertQueue(toAdd);}//4.存储在内存中memoryDataManage.insertQueue(toAdd);log.info("[VirtualHost] 创建队列成功");return true;}}catch (Exception e) {//如果出现异常,则说明创建队列失败e.printStackTrace();log.info("[VirtualHost] 创建队列失败");return false;}
}
d.queueDelete(销毁队列):
需要传入的参数:queueName(String,队列名称),核心逻辑跟销毁队列一样
代码具体实现:
public boolean queueDelete(String queueName) {//首先对queueName进行处理queueName = VirtualHostName + queueName;try {synchronized (queueLocker) {//1.检查队列是否存在MSGQueue queue = memoryDataManage.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 当前队列不存在");}//2.进行判断当前队列是否持久化,如果持久化才进行硬盘删除if (queue.isDurable()) {diskDataManage.deleteQueue(queueName);}//3.删除内存中的存储的数据memoryDataManage.deleteQueue(queueName);//4.删除queue目录文件diskDataManage.deleteQueueFiles(queueName);}log.info("[VirtualHost] 队列删除成功");return true;}catch (Exception e) {e.printStackTrace();log.info("[VirtualHost] 队列删除失败");return false;}
}
e.queueBind(创建绑定):
需要传入的参数:exchangeName(String,交换机名称),queueName(String,队列名称),bindingKey(String)
具体的代码实现:
public boolean queueBind(String queueName, String exchangeName, String bindingKey) {queueName = VirtualHostName + queueName;exchangeName = VirtualHostName + exchangeName;try {synchronized(exchangeLocker) {synchronized (queueLocker) {//1.先根据queueName和exchangeName查看绑定是否存在Binding binding = memoryDataManage.getBinding(queueName,exchangeName);if(binding != null) {throw new MqException("[VirtualHost] 当前绑定已存在");}//2.检查一下bindingKey是否合法if(!Router.checkBindingKey(bindingKey)) {throw new MqException("[VirtualHost] bindingKey不合法,创建绑定失败");}//3.检查对应的交换机和队列是否存在MSGQueue queue = memoryDataManage.getQueue(queueName);Exchange exchange = memoryDataManage.getExchange(exchangeName);if(queue == null || exchange == null) {throw new MqException("[VirtualHost] 交换机或队列不存在,创建队列失败");}//4.构造binding对象Binding newBinding = new Binding();newBinding.setBindingKey(bindingKey);newBinding.setQueueName(queueName);newBinding.setExchangeName(exchangeName);//5.看队列和交换机是否持久化if(queue.isDurable() && exchange.isDurable()) {diskDataManage.insertBinding(newBinding);}//6.写入内存memoryDataManage.insertBinding(newBinding);log.info("[VirtualHost] 创建绑定成功");}}return true;}catch (Exception e) {e.printStackTrace();log.info("[VirtualHost] 创建绑定失败");return false;}
}
ps:只要创建绑定的交换机或者队列不存在,那么此时创建绑定都是无意义的,如果都存在的话,交换机和队列都进行持久化了,此时绑定进行持久化才有意义。
f.queueUnbind(销毁绑定):
需要传入的参数:exchangeName(String,交换机名称),queueName(String,队列名称)
首先根据exchangeName和queueName在bindingMap中查找当前的binding是否存在,如果不存在直接抛异常然后返回,如果存在则直接进行硬盘(不管硬盘上是否存在都进行删除,省去查看交换机和队列是否持久化的操作)和内存上的binding的删除。
具体代码实现:
public boolean queueUnBind(String queueName,String exchangeName) {queueName = VirtualHostName + queueName;exchangeName = VirtualHostName + exchangeName;try {synchronized(exchangeLocker) {synchronized (queueLocker) {//1.检查当前绑定是否存在Binding binding = memoryDataManage.getBinding(queueName,exchangeName);if(binding == null) {throw new MqException("[VirtualHost] 当前绑定不存在");}//2.无论交换机和队列是否存在都进行删除,先删除硬盘diskDataManage.deleteBinding(binding);//3.再删内存memoryDataManage.deleteBinding(binding);log.info("[VirtualHost] 删除绑定成功");}}return true;}catch (Exception e) {e.printStackTrace();log.info("[VirtualHost] 删除绑定失败");return false;}
}
g.basicPublish(发送消息):
需要传入的参数:exchangeName(String,交换机名称),routingKey(String),basicProperties(BasicProperties,消息的基本属性),body(byte[],消息的内容)
具体的代码实现:
public boolean basicPublish(String exchangeName,String routingKey,BasicProperties basicProperties,byte[] body) {try {synchronized (exchangeLocker) {synchronized (queueLocker) {exchangeName = VirtualHostName + exchangeName;//1.查询当前交换机/队列是否存在Exchange exchange = memoryDataManage.getExchange(exchangeName);if(exchange == null) {throw new MqException("[VirtualHost] 当前交换机不存在,发送消息失败");}//2.检查routingKey的合法性if(!Router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey不合法,发送消息失败");}//3.判断当前交换机的类型,不同类型转发方式不同if(exchange.getType() == ExchangeType.DIRECT) {//直接交换机,转发到以routingKey作为队列的名字,添加到对应队列中去String queueName = VirtualHostName + routingKey;MSGQueue queue = memoryDataManage.getQueue(queueName);if(queue == null) {throw new MqException("[VirtualHost] 队列不存在,发送消息失败");}//构造message对象basicProperties.setRoutingKey(routingKey);Message message = Message.createMessageWithId(basicProperties,body);//发送到指定队列中sendMessage(queueName,message);}else {//还有扇出(每一个队列发送信息)和主题交换机(routingKey和BindingKey对应上的队列才发送信息)//使用这两种交换机的话,肯定需要遍历所有队列ConcurrentHashMap<String,Binding> bindingsMap = memoryDataManage.getBindings(exchangeName);for(Map.Entry<String,Binding> entry : bindingsMap.entrySet()) {//(1)获取当前队列名称和bindingString queueName = entry.getKey();Binding binding = entry.getValue();//(2)判定当前队列是否存在,如果不存在,直接抬走下一位MSGQueue queue = memoryDataManage.getQueue(queueName);if(queue == null) {log.info("[VirtualHost] 当前队列不存在");continue;}basicProperties.setRoutingKey(routingKey);//3.判断交换机类型是否为主题交换机if(!Router.route(exchange.getType(),binding,basicProperties)) {continue;}//4.队列已经存在,构造message对象Message message = Message.createMessageWithId(basicProperties,body);//5.添加信息sendMessage(queueName,message);}}log.info("[VirtualHost] 添加信息成功");return true;}}}catch (Exception e) {e.printStackTrace();log.info("[VirtualHost] 发送信息失败");return false;}
}private void sendMessage(String queueName, Message message) throws IOException, InterruptedException {//1.判断队列是否存在MSGQueue queue = memoryDataManage.getQueue(queueName);if(queue == null) {throw new MqException("[VirtualHost] 当前要写的队列不存在");}//2.判断消息是否需要持久化if(message.getBasicProperties().getDeliveryMode() == 2) {diskDataManage.writeMessage(queue,message);}//3.写入内存memoryDataManage.sendMessage(queueName,message);//4.提醒消息者消费消息consumerManager.notifyConsumer(queueName);
}
关于routingKey和bindingKey的校验:
约定routingKey的形式:包含数字/字母(大小写)/下划线,用.进行分割
约定bindingKey的形式:包含数字/字母(大小写)/下划线,用.进行分割,*表示1个部分/#表示0个或多个部分。ps:aaa.a*.ccc/aaa.*c.ccc是不合法的
eg:routingKey:aaa.bbb.ccc.ddd bindingKey:aaa.#.ddd/aaa.*.*.ddd 可以匹配成功
但bindingKey里面不允许出现aaa.*.#.ddd/aaa.#.*.ddd/aaa.#.#.ddd的形式,这种形式在后续进行匹配的时候不好进行处理,所以可以提前对这三种形式进行一下判断。
如果当前为直接交换机,bindingKey用不到,消息需要转发到以routingKey作为名称的队列,此时只主要对routingKey做一下合法性校验。
如果当前为扇出交换机,bindingKey和routingKey都用不到,消息转发到与当前交换机进行了绑定的队列中(只需要交换机的名称)
如果当前为主题交换机,bindingKey和routingKey都会用到,此时就会涉及到两个key的校验了,如果校验成功了才进行转发消息,否则则不进行转发。
具体代码实现放在一个Router类中:
public class Router {/*bindingKey命名规则:1.数字,下划线,字母2.使用.进行分割3.允许*或#作为一部分或多部分*/public static boolean checkBindingKey(String bindingKey) {//也是合法的情况,直接和扇出交换机用不到,所以可以设置为""if(bindingKey.length() == 0) {return true;}for (int i = 0; i < bindingKey.length(); i++) {if(bindingKey.charAt(i) >= 'A' && bindingKey.charAt(i) <= 'Z') {continue;}if(bindingKey.charAt(i) >= 'a' && bindingKey.charAt(i) <= 'z') {continue;}if(bindingKey.charAt(i) == '.' && bindingKey.charAt(i) == '_') {continue;}if(bindingKey.charAt(i) >= '0' && bindingKey.charAt(i) <= '9') {continue;}if(bindingKey.charAt(i) == '*' && bindingKey.charAt(i) == '#') {continue;}//除此之外其他情况都是返回不合法的return false;}//检查*和#是否为独立的部分//aaa.*.bbb合法,aaa.a*.bbb/aaa.*a.bbb/aaa.#a.bbb不合法String[] strs = bindingKey.split("\\.");for(String s : strs) {if(s.length() > 1 && (s.contains("*") || s.contains("#"))) {return false;}}//这里认为在进行约定一下//如果使用前三种的话,匹配起来就有点麻烦,同时功能性不大//a.#.*.b 不合法//a.#.#.b 不合法//a.*.#.b 不合法//a.*.*.b 合法for(int i = 0; i < strs.length-1; i++) {if(strs[i].equals("#") && strs[i+1].equals("*")) {return false;}if(strs[i].equals("*") && strs[i+1].equals("#")) {return false;}if(strs[i].equals("#") && strs[i+1].equals("#")) {return false;}}return true;}/*routingKey命名规则:1.数字。下划线,字母2.用.进行分割*/public static boolean checkRoutingKey(String routingKey) {//进行判空,有些时候(扇出交换机用不到)不需要routingKey就可以设置为"",但是并不代表是错误的if(routingKey.length() == 0) {return true;}for (int i = 0; i < routingKey.length(); i++) {if(routingKey.charAt(i) >= 'A' && routingKey.charAt(i) <= 'Z') {continue;}if(routingKey.charAt(i) >= 'a' && routingKey.charAt(i) <= 'z') {continue;}if(routingKey.charAt(i) == '.' && routingKey.charAt(i) == '_') {continue;}if(routingKey.charAt(i) >= '0' && routingKey.charAt(i) <= '9') {continue;}//除此之外其他情况都是返回不合法的return false;}return true;}public static boolean route(ExchangeType type, Binding binding, BasicProperties basicProperties) {if(type == ExchangeType.FANOUT) {return true;}else if(type == ExchangeType.TOPIC) {return routeTopic(binding.getBindingKey(),basicProperties.getRoutingKey());}else {System.out.println("传入的交换机类型非发");return false;}}// [测试用例]// binding key routing key result// aaa aaa true// aaa.bbb aaa.bbb true// aaa.bbb aaa.bbb.ccc false// aaa.bbb aaa.ccc false// aaa.bbb.ccc aaa.bbb.ccc true// aaa.* aaa.bbb true// aaa.*.bbb aaa.bbb.ccc false// *.aaa.bbb aaa.bbb false// # aaa.bbb.ccc true// aaa.# aaa.bbb true// aaa.# aaa.bbb.ccc true// aaa.#.ccc aaa.ccc true// aaa.#.ccc aaa.bbb.ccc true// aaa.#.ccc aaa.aaa.bbb.ccc true// #.ccc ccc true// #.ccc aaa.bbb.ccc truepublic static boolean routeTopic(String bindingKey,String routingKey) {//进行routingKey和bindingKey的匹配//将bindingKey和routingKey进行分割(按照".")String[] bindings = bindingKey.split("\\.");int bindLen = bindings.length;String[] routings = routingKey.split("\\.");int routeLen = routings.length;int i = 0;int j = 0;while(i < bindLen && j < routeLen) {if(bindings[i].equals("*")) {i++;j++;continue;}else if(bindings[i].equals("#")) {i++;if(i == bindLen) {//已经到bindings末尾了,直接返回成功return true;}System.out.println(i);//还没到末尾,需要去routings去找到bindings[i],如果没找到此时两个key已然不相同int routingIndex = findRoutingIndex(bindings,routings,i,j);if(routingIndex == -1) {//没找到return false;}j = routingIndex;}else if(bindings[i].equals(routings[j])){//bindings[i]和routings[i]相同i++;j++;continue;}else {return false;}}if(i == bindLen && j == routeLen) {return true;}return false;}private static int findRoutingIndex(String[] bindings,String[] routings, int i, int j) {for(int k = j; k < routings.length; k++) {if(routings[k].equals(bindings[i])) {return k;}}return -1;}}
h.basicConsume(订阅消息):
需要传入的参数:consumerTag(String,消费者身份标识),queueName(String,队列名称),autoAck(boolean,自动确认)),consumer(Consumer,消费者执行回调的入口)
Consumer:是一个函数式接口,里面只有一个方法:handleDelivery,delivery是投递的意思,当某个消费者订阅了消息,服务器接收到了某个生产者生产的消息就会将这个消息推送给当前订阅消息的消费者,消费者接收到这个消息,就是通过这里的这个回调的方法来进行消费信息。
主要有三个参数:
@FunctionalInterface
public interface Consumer {//deliver是“投递”的意思,这个方法意在每次服务器接收到信息后,// 都把消息交给消费者进行消费//这里的参数也是参考RabbitMQ来搞的//consumerTag表示消费者的身份标识void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}
消费者可以通过重写这个方法来进行自己的逻辑代码的实现。
在这个方法中还涉及到两个类:ConsumerEnv(真正的消费者)和ConsumerManager(消息者管理)
ConsumerEnv:
@Data
public class ConsumerEnv {//消费者身份标识private String consumerTag;//队列名称private String queueName;//是否是自动确认消息,还是手动确认private boolean autoAck;private Consumer consumer;public ConsumerEnv(String consumerTag,String queueName,boolean autoAck,Consumer consumer) {this.autoAck = autoAck;this.consumer = consumer;this.consumerTag = consumerTag;this.queueName = queueName;}
}
ConsumerManager:
是消费者消费信息的核心类,消费者如何能够检测到服务器这边接收到了信息,以及如何将信息推送给消费者进行消费在这个类中得到了很好的回答。
在这个类中维护了一个阻塞队列,这个队列用来存放有消息的队列,一旦有生产者生产了消息,都会调用这个方法将该队列名放进该阻塞队列;而这个取阻塞队列中的元素的操作放在一个单独运行的后台线程中实现,如果队列中没有元素,那么整个线程将会被阻塞,并且这个过程是循环的(也就是在取到元素,完成后续操作(通过队列名查找队列,然后进行消费信息)后,又会回到最开始去取元素,然后继续完成后续操作);同时还维护了一个ArrayList<ConsumerEnv>顺序表,用来存放当前所有队列中的消费者,为后序队列非独占的实现做准备(eg:queue1中的消息所有队列中的消费者都能进行消费,否则就只能queue1中的消费者进行消费)
核心方法:
addConsumer:
根据queueName找到对应队列,然后将消费者添加进入队列中维护的ArrayList中,如果没有查找到对应队列,直接抛异常。还需要往ConsumerManager中维护的ArrayList添加;如果当前队列中有消息的话,还需要进行消费完。
下面两个元素是在MSGQueue中维护的:

轮询:
consumerSeq是用来进行轮询的,如果当前队列中有两个消费者,第一次取出消费者1进行消费,第二次取出消费者2进行消费,第三次取出消费者1进行消费........
每次取出consumerEnvList.get(consumerSeq.get()%consumerEnvList.size()),然后consumerSeq++,就可以完成轮询操作(也就是先完成消费的,需要重新排队进行消费,但并不是真正的把消费者移动到顺序表末尾,每次取对首元素)
consumeMessage:
当前方法的执行是在上述所说的后台线程中执行,执行的契机是阻塞队列中有元素时(也就是服务器成功接收到消息,并且成功转发到某个队列中),一旦执行到这个方法,就需要找一个消费者进行消费,这里有两种情况:
队列独占(exclusive:true):直接通过当前队列中的存放当前当前队列所有消费者的顺序表轮询出一个消费者即可,如果没有消费者直接抛异常。
MSGQueue中的一个方法:

队列不独占(exclusive:false):此时需要从ConsumerManager中维护的存放所有队列的消费者的顺序表轮询出一个消费者,如果没有消费者直接抛异常。
ConsumerManager中的一个方法:

消费消息的过程可以交给线程池来执行,这样在同一时刻可以消费多个消息。同时如果当前消费者采用的是自动确认,那么在消费完成信息过后,需要进行相关数据的删除,如果是手动确认还需要调用方法basicAck。
notifyConsumer:
向当前阻塞队列中添加有消息的队列,是在basicPublish中调用的。
具体代码实现:
@Slf4j
public class ConsumerManager {//存储上层的VirtualHost的对象引用,用来操作数据private VirtualHost parent;//创建一个线程池,其中线程用来调用回调函数,消费消息private ExecutorService executorService = Executors.newFixedThreadPool(4);//存放令牌的队列private BlockingQueue<String> tokens = new LinkedBlockingQueue<>();//扫描整体队列的线程private Thread thread;//用来存放当前所有队列中有哪些有消费者(为后续队列实现非独占做准备)private ArrayList<ConsumerEnv> consumerLists = new ArrayList<>();//方便采用轮询(第一次消费过后需要重新排队进行消费)private AtomicInteger seqNumber = new AtomicInteger(0);public ConsumerManager(VirtualHost virtualHost) {parent = virtualHost;thread = new Thread(() -> {while(true) {try {//1.拿到令牌(队列名)String queueName = tokens.take();//2.根据令牌找到队列MSGQueue queue = parent.getMemoryDataManage().getQueue(queueName);if(queue == null) {throw new MqException("当前队列不存在,queueName: " + queueName);}//3.消费消息(有可能产生线程安全问题所以需要对当前队列加锁)synchronized(queue) {try {consumeMessage(queue);}catch (MqException e) {e.printStackTrace();log.info("消费信息失败");}}} catch (InterruptedException e) {e.printStackTrace();}}});//设置为后台线程thread.setDaemon(true);thread.start();}public void notifyConsumer(String queueName) throws InterruptedException {tokens.put(queueName);}public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {try {//1.根据队列名找到队列MSGQueue queue = parent.getMemoryDataManage().getQueue(queueName);if(queue == null) {throw new MqException("[consumerManager] 当前队列不存在,queueName: " + queueName);}//2.构造消费者ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);//3.添加消费者synchronized (queue) {queue.addConsumerEnv(consumerEnv);consumerLists.add(consumerEnv);//如果当前队列中已经有了消息就直接消费掉int totalMessages = parent.getMemoryDataManage().getCounts(queueName);while(totalMessages > 0) {//每一次循环就消费掉一个消息consumeMessage(queue);totalMessages--;}}log.info("[consumerManager] 添加消费者到队列中成功");}catch (Exception e) {e.printStackTrace();log.info("[consumerManager] 添加消费者到队列中失败,queueName: " + queueName + " ,consumerTag: " + consumerTag);}}private void consumeMessage(MSGQueue queue){//1.按照轮询的方式,查询出一个消费者ConsumerEnv luckier = null;log.info("queueName: " + queue.getName() + ",exclusive: " + queue.isExclusive());if(queue.isExclusive()) {//独占luckier = queue.chooseConsumerEnvExclusive();if (luckier == null) {throw new MqException("[consumerManager] 当前队列中没有消费者,queueName: " + queue.getName());}log.info("独占 queueName: " + luckier.getQueueName() + ",exclusive: " + queue.isExclusive());}else {//非独占luckier = chooseConsumerEnvNotExclusive();if (luckier == null) {throw new MqException("[consumerManager] 当前所有队列中没有消费者,queueName: " + queue.getName());}log.info("非独占 queueName: " + luckier.getQueueName() + ",exclusive: " + queue.isExclusive());}ConsumerEnv finalLuckier = luckier;//2.取出一个消息(实际内存还存在)Message message = parent.getMemoryDataManage().peekMessage(queue.getName());if(message == null) {throw new MqException("[consumerManager] 当前队列中没有消息,queueName " + queue.getName());}//3.消费消息caiyonexecutorService.submit(() -> {try {//将未确认的消息存储在内存中parent.getMemoryDataManage().addMessageWaitAck(queue.getName(),message);//执行回调函数,消费消息finalLuckier.getConsumer().handleDelivery(finalLuckier.getConsumerTag(),message.getBasicProperties(),message.getBody());//这里有两种确认方式:// 手动确认:需要手动basicAck才能彻底将消息进行消费// 自动确认:无需确认消息一经调用就被消费掉if(finalLuckier.isAutoAck()) {//删除在硬盘上存储的消息if(message.getBasicProperties().getDeliveryMode() == 2) {parent.getDiskDataManage().deleteMessage(queue, message);}//删除在内存中的存储的消息(未确认)parent.getMemoryDataManage().removeMessageWaitAck(queue.getName(),message.getBasicProperties().getMessageId());//删除在内存中的存储的消息(实际存储的),删除总的存储message地方中的messageparent.getMemoryDataManage().removeMessage(message.getBasicProperties().getMessageId());//删除对应对列中messageparent.getMemoryDataManage().pollMessage(queue.getName());}}catch (Exception e) {e.printStackTrace();log.info("消费消息失败");}});log.info("消费消息成功");}private ConsumerEnv chooseConsumerEnvNotExclusive() {if(consumerLists.size() == 0) {return null;}int index = seqNumber.get() % consumerLists.size();seqNumber.incrementAndGet();return consumerLists.get(index);}
}
i.basicAck(手动确认):
需要传入的参数有:queueName(String),messageId(String)
根据当前两个参数查找对应的队列和message,如果都存在再进行数据的删除
具体代码实现:
public boolean basicAck(String queueName,String messageId) {try {//1.获取到对应的队列和消息MSGQueue queue = memoryDataManage.getQueue(queueName);if(queue == null) {throw new MqException("[VirtualHost] 当前队列不存在");}Message message = memoryDataManage.getMessage(messageId);if(message == null) {throw new MqException("[VirtualHost] 当前消息不存在");}//2.进行硬盘和内存上数据的删除memoryDataManage.removeMessage(messageId);memoryDataManage.pollMessage(queueName);if(message.getBasicProperties().getDeliveryMode() == 2) {//表示消息已经持久化,需要删除diskDataManage.deleteMessage(queue,message);}memoryDataManage.removeMessageWaitAck(queueName,messageId);log.info("[VirtualHost] 手动确认消息消费完成成功");return true;}catch (Exception e) {e.printStackTrace();log.info("[VirtualHost] 手动确认消息消费完成失败");return false;}
}
②BrokerServer
这个类主要就是接收请求(包括消费者,生产者发送的消息请求),然后解析请求,计算响应并且返回。
所以首先需要做的就是构造一个请求,这个过程是在Channel中做的,而提到Channel就不得不提到Connection,在文章开头也提到过这两者之间的关系,一个客户端只需要建立一次连接,通过这一次连接就可以发送多个请求,大大提高了网络传输效率。
这里采用自定义应用层协议:

客户端只需要调用对应的方法,然后服务器根据请求中的type来判断当前客户端调用的何种方法,从而进行响应计算和返回,客户端这边只会收到成功/失败的信息,客户端并不知道具体的处理过程,这种方式叫做:RPC(远程方法调用)
协议类型:

a.Channel类:
该类主要完成开始的9大方法的请求的构造。
该类中为维护了channelId(当前channel的身份标识),consumer(执行回调,一个channel中只能允许有一个consumer来执行回调,所以在订阅消息的时候,要首先进行判空),connection(当前channel属于哪个连接),ConcurrentHashMap<String,BasicReturns>(key为:rid,当前请求/响应的id,value:返回的响应)
如果是普通的响应(0x1-0xb请求返回的响应),那么使用BasicReturns(参数有:rid,channelId,ok(请求是否成功))这个类就好了,这里使用rid作为key,是为了防止请求发送的顺序与响应返回的顺序不同(eg:请求1先发送,请求2后发送,但是请求2却先返回,请求1后返回,使用map通过rid查询就可以查询出对应请求的响应)。每一次发送请求会通过UUID类生成一个唯一的rid(不同之间请求/响应的rid肯定不同嘛)
如果是服务器向客户端推送消息的响应(0xc,响应独有的类型),那么使用SubScribeReturns(继承于BasicReturns,在此之上还有参数:consumerTag,BasicProperties,body)。
类中还有个重要的方法:waitBasicReturns(等待响应),在方法实现的内部,如果当前响应未返回则会wait进行等待(就是通过查看上述Map中是否有元素),直到有方法(这里采用notifyAll,因为不知道有几个线程在进行等待,唤醒之后只需要在方法中进行再次的判断Map是否有元素,有的就直接返回当前取到的返回值,否则则继续等待)进行唤醒。
其他方法就是针对上述9大方法的请求构造(按照自定义应用层协议),先写type,再写length,最后写payload(负载:请求内容)
在实现具体代码之前还需要对相关请求类进行构造:
BasicArguments:(下面所有类(由于通过网络传输,都需要实现Serializable接口)的父类,公有属性)
@Data
public class BasicArguments implements Serializable {//表示一次请求/响应的身份标识,可以将请求与响应对应起来private String rid;//当前channel的身份标识private String channelId;
}
ExchangeDeclareArguments:
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private boolean durable;private boolean autoDelete;private ExchangeType type;private Map<String,Object> arguments;
}
ExchangeDeleteArguments:
@Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {private String exchangeName;
}
QueueDeclareArguments:
@Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {private String queueName;private boolean durable;private boolean autoDelete;private boolean exclusive;private Map<String,Object> arguments;
}
QueueDeleteArguments:
@Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {private String queueName;
}
QueueBindArguments:
@Data
public class QueueBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;private String bindingKey;
}
QueueUnbindArguments:
@Data
public class QueueUnbindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;
}
BasicPublishArguments:
@Data
public class BasicPublishArguments extends BasicArguments implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body;
}
BasicConsumeArguments:
@Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;private Consumer consumer;
}
BasicAckArguments:
@Data
public class BasicAckArguments extends BasicArguments implements Serializable {private String queueName;private String messageId;
}
Channel类实现:
@Slf4j
@Data
public class Channel {private String channelId;//标识当前channel属于哪个连接private Connection connection;//维护一个哈希表,用来存储服务器返回来的响应//key-ridprivate ConcurrentHashMap<String,BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();//如果某个channel订阅了消息,那么就需要服务器在推送回来消息的时候,执行一下回调函数,也就是消费推送回来的消息//这里约定一个channel中只能有一个consumer来执行回调private Consumer consumer;public Channel(String channelId,Connection connection) {this.connection = connection;this.channelId = channelId;}//与服务器之间的交互//通过这个类告诉服务器我要创建一个channelpublic boolean createChannel() throws IOException {//1.构造payloadBasicArguments basicArguments = new BasicAckArguments();basicArguments.setRid(createNewRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(basicArguments);//2.构造响应Request request = new Request();request.setPayload(payload);request.setLength(payload.length);request.setType(0x1);//3.发送请求connection.writeRequest(request);//4.等待服务器的响应BasicReturns basicReturns = waitBasicReturns(basicArguments.getRid());return basicReturns.isOk();}private BasicReturns waitBasicReturns(String rid) {BasicReturns basicReturns = null;while((basicReturns = basicReturnsMap.get(rid)) == null) {synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();log.info("[Channel] 服务器响应返回,等待结束");}}}//在返回结果之前还需要把对应哈希表中的数据给进行清除//虽然每一次生成的rid都不同,好像也不会影响后续连接但是这个记录会一直在哈希表中,直到服务器停止//但是可以让内存压力小一点basicReturnsMap.remove(rid);return basicReturns;}private String createNewRid() {return "R-" + UUID.randomUUID();}//在Connection中扫描到了对应的响应在调用这个方法public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(),basicReturns);synchronized (this) {//由于当前不知道等待的线程有多少,所以直接全部唤醒notifyAll();}}//告诉服务器销毁channelpublic boolean closeChannel() throws IOException {//1.构造payloadBasicArguments basicArguments = new BasicAckArguments();basicArguments.setChannelId(channelId);basicArguments.setRid(createNewRid());byte[] payload = BinaryTool.toBytes(basicArguments);//2.构造响应Request request = new Request();request.setPayload(payload);request.setLength(payload.length);request.setType(0x2);//3.发送请求connection.writeRequest(request);//4.等待服务器的响应BasicReturns basicReturns = waitBasicReturns(basicArguments.getRid());return basicReturns.isOk();}//告诉服务器创建一个交换机public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable, boolean autoDelete,Map<String,Object> arguments) throws IOException {//1.构造payloadExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setType(type);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setRid(createNewRid());byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);//2.构造响应Request request = new Request();request.setPayload(payload);request.setLength(payload.length);request.setType(0x3);//3.发送请求connection.writeRequest(request);//4.等待服务器的响应BasicReturns basicReturns = waitBasicReturns(exchangeDeclareArguments.getRid());return basicReturns.isOk();}//告诉服务器销毁交换机public boolean exchangeDelete(String exchangeName) throws IOException {//1.构造payloadExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();exchangeDeleteArguments.setExchangeName(exchangeName);exchangeDeleteArguments.setChannelId(channelId);exchangeDeleteArguments.setRid(createNewRid());byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);//2.构造响应Request request = new Request();request.setPayload(payload);request.setLength(payload.length);request.setType(0x4);//3.发送请求connection.writeRequest(request);//4.等待服务器的响应BasicReturns basicReturns = waitBasicReturns(exchangeDeleteArguments.getRid());return basicReturns.isOk();}//告诉服务器创建一个队列public boolean queueDeclare(String queueName,boolean durable,boolean autoDelete,boolean exclusive,Map<String,Object> arguments) throws IOException {//1.构造payloadQueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setRid(createNewRid());queueDeclareArguments.setArguments(arguments);queueDeclareArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(queueDeclareArguments);//2.构造响应Request request = new Request();request.setPayload(payload);request.setLength(payload.length);request.setType(0x5);//3.发送请求connection.writeRequest(request);//4.等待服务器的响应BasicReturns basicReturns = waitBasicReturns(queueDeclareArguments.getRid());return basicReturns.isOk();}//告诉服务器销毁队列public boolean queueDelete(String queueName) throws IOException {//1.构造payloadQueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();queueDeleteArguments.setQueueName(queueName);queueDeleteArguments.setChannelId(channelId);queueDeleteArguments.setRid(createNewRid());byte[] payload = BinaryTool.toBytes(queueDeleteArguments);//2.构造响应Request request = new Request();request.setPayload(payload);request.setLength(payload.length);request.setType(0x6);//3.发送请求connection.writeRequest(request);//4.等待服务器的响应BasicReturns basicReturns = waitBasicReturns(queueDeleteArguments.getRid());return basicReturns.isOk();}//告诉服务器创建一个绑定public boolean queueBind(String queueName,String exchangeName,String bindingKey) throws IOException {//1.构造payloadQueueBindArguments queueBindArguments = new QueueBindArguments();queueBindArguments.setBindingKey(bindingKey);queueBindArguments.setQueueName(queueName);queueBindArguments.setExchangeName(exchangeName);queueBindArguments.setRid(createNewRid());queueBindArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(queueBindArguments);//2.构造响应Request request = new Request();request.setPayload(payload);request.setLength(payload.length);request.setType(0x7);//3.发送请求connection.writeRequest(request);//4.等待服务器的响应BasicReturns basicReturns = waitBasicReturns(queueBindArguments.getRid());return basicReturns.isOk();}//告诉服务器销毁一个绑定public boolean queueUnBind(String queueName,String exchangeName) throws IOException {//1.构造payloadQueueUnBindArguments queueUnBindArguments = new QueueUnBindArguments();queueUnBindArguments.setQueueName(queueName);queueUnBindArguments.setExchangeName(exchangeName);queueUnBindArguments.setRid(createNewRid());queueUnBindArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(queueUnBindArguments);//2.构造响应Request request = new Request();request.setPayload(payload);request.setLength(payload.length);request.setType(0x8);//3.发送请求connection.writeRequest(request);//4.等待服务器的响应BasicReturns basicReturns = waitBasicReturns(queueUnBindArguments.getRid());return basicReturns.isOk();}//告诉服务器发送一个消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {//1.构造payloadBasicPublishArguments basicPublishArguments = new BasicPublishArguments();basicPublishArguments.setBasicProperties(basicProperties);basicPublishArguments.setBody(body);basicPublishArguments.setRoutingKey(routingKey);basicPublishArguments.setChannelId(channelId);basicPublishArguments.setExchangeName(exchangeName);basicPublishArguments.setRid(createNewRid());byte[] payload = BinaryTool.toBytes(basicPublishArguments);//2.构造响应Request request = new Request();request.setPayload(payload);request.setLength(payload.length);request.setType(0x9);//3.发送请求connection.writeRequest(request);//4.等待服务器的响应BasicReturns basicReturns = waitBasicReturns(basicPublishArguments.getRid());return basicReturns.isOk();}//告诉服务器某个消费者要订阅消息public boolean basicConsumer(String queueName, boolean autoAck, Consumer consumer) throws IOException {//首先设置一下回调if(this.consumer != null) {throw new IOException("[Channel] 当前channel中已经设置过回调,不能重复设置");}this.consumer = consumer;//1.构造payloadBasicConsumerArguments basicConsumerArguments = new BasicConsumerArguments();//直接用当前channelId代表消费者basicConsumerArguments.setConsumerTag(channelId);basicConsumerArguments.setRid(createNewRid());basicConsumerArguments.setAutoAck(autoAck);basicConsumerArguments.setChannelId(channelId);basicConsumerArguments.setQueueName(queueName);byte[] payload = BinaryTool.toBytes(basicConsumerArguments);//2.构造响应Request request = new Request();request.setPayload(payload);request.setLength(payload.length);request.setType(0xa);//3.发送请求connection.writeRequest(request);//4.等待服务器的响应BasicReturns basicReturns = waitBasicReturns(basicConsumerArguments.getRid());return basicReturns.isOk();}//告诉服务器手动确认public boolean basicAck(String queueName,String messageId) throws IOException {//1.构造payloadBasicAckArguments basicAckArguments = new BasicAckArguments();basicAckArguments.setMessageId(messageId);basicAckArguments.setQueueName(queueName);basicAckArguments.setRid(createNewRid());basicAckArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(basicAckArguments);//2.构造响应Request request = new Request();request.setPayload(payload);request.setLength(payload.length);request.setType(0xb);//3.发送请求connection.writeRequest(request);//4.等待服务器的响应BasicReturns basicReturns = waitBasicReturns(basicAckArguments.getRid());return basicReturns.isOk();}}
b.Connection类:
该类主要做的工作是读取响应,然后进行返回。
首先该类通过Socket与服务器进行通信,需要传入主机ip和端口号,可以直接在构造方法中实现,在此方法中还维护了一个线程负责不断从Socket中读取响应,并且进行响应类型的判断:
如果为0xc则代表为服务器向消费者推送消息,此时就需要交给线程池进行回调方法的执行,然后进行响应的返回。
如果为0x1-0xb则代表为其他的普通地操作队列的响应,此时只需要进行响应的返回就行。
其次维护了一个ConcurrentHashMap<String,Channel>,表示当前连接里面有哪些channel,后续需要通过这个channel来完成方法的回调和响应的返回。
最后还提供了两个方法,createChannel(创建频道)和writeRequest(写请求,记得flush进行刷新)
具体代码实现:
@Slf4j
public class Connection {private Socket socket;private InputStream inputStream;private OutputStream outputStream;private DataOutputStream dataOutputStream;private DataInputStream dataInputStream;private ExecutorService callbackPool;//保存当前连接里面有哪些channelprivate ConcurrentHashMap<String,Channel> channelMap;public Connection(String host, int port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);channelMap = new ConcurrentHashMap<>();callbackPool = Executors.newFixedThreadPool(4);// 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理.Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {// 连接正常断开的. 此时这个异常直接忽略.log.info("[Connection] 连接正常断开!");} catch (IOException | ClassNotFoundException | MqException e) {log.info("[Connection] 连接异常断开!");e.printStackTrace();}});t.start();}public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new IOException("读取的响应数据不完整!");}response.setPayload(payload);log.info("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());return response;}//当前类用来判断当前接收到的响应是正常的操作消息队列的操作(0x1-0xb)还是服务器推送消息的(0xc)private void dispatchResponse(Response response) throws IOException, ClassNotFoundException {if(response.getType() == 0xc) {SubScribeReturns scribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());Channel channel = channelMap.get(scribeReturns.getConsumerTag());if(channel == null) {throw new IOException("[Connection] 当前要找的channel不存在 channelId: " + scribeReturns.getConsumerTag());}//执行channel内部对象的回调(交给线程池来执行)callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(scribeReturns.getConsumerTag(),scribeReturns.getBasicProperties(),scribeReturns.getBody());}catch (Exception e) {e.printStackTrace();log.info("[Connection] 执行回调过程中出现错误");}});channel.putReturns(scribeReturns);}else {//普通操作消息队列BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());Channel channel = channelMap.get(basicReturns.getChannelId());if(channel == null) {throw new IOException("[Connection] 当前要找的channel不存在 channelId: " + basicReturns.getChannelId());}channel.putReturns(basicReturns);}}//关闭所有连接public void close() {try {callbackPool.shutdownNow();inputStream.close();outputStream.close();socket.close();channelMap.clear();} catch (IOException e) {e.printStackTrace();log.info("[Connection] 连接释放失败");}}//通过该方法在当前Connection中创建一个channelpublic Channel createChannel() throws IOException {//使用UUID生成唯一的身份标识String channelId = "C-" + UUID.randomUUID();Channel channel = new Channel(channelId,this);//通知一下服务器,向服务器发送请求boolean ok = channel.createChannel();if(!ok) {throw new IOException("[Connection] 创建channel失败");}channelMap.put(channelId,channel);return channel;}public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();log.info("[Connection] 发送请求 type: " + request.getType() + ",length: " + request.getLength());}
}
c.BrokerServer类:
在BrokerServer中主要就是接收channel这边发送的请求,然后根据类型调用VirtualHost对应的方法,然后进行响应的构造并且在Connection类中读取响应将响应添加到channel的响应集合中,最终channel里面等待响应结束并进行返回。
具体代码实现:
@Slf4j
public class BrokerServer {private ServerSocket serverSocket;//当前只考虑一个BrokerServer上只有一个虚拟主机private VirtualHost virtualHost = new VirtualHost("default");//使用一个哈希表来表示当前所有的会话(就是当前有哪些客户端正在与服务器进行通信)//key-channelId value-Socket(每一个socket就代表一个客户端)private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();//引入一个线程池,来处理多个客户端的请求private ExecutorService executorService;//引入一个变量,控制当前服务器的是否运行private volatile boolean runnable = true;public BrokerServer(int port) throws IOException {serverSocket = new ServerSocket(port);}//服务器启动public void start() throws IOException {log.info("[BrokerServer] 启动");executorService = Executors.newCachedThreadPool();try {while(runnable) {Socket clientSocket = serverSocket.accept();executorService.submit(() -> {processConnection(clientSocket);});}}catch (SocketException e) {log.info("[BrokerServer] 服务器停止运行");}}//通过这方法来处理一个客户端的连接//但在这个连接中可能设计很多的请求与响应private void processConnection(Socket clientSocket) {try(InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {//由于需要按照特定格式进行数据的读取,所以还需要使用下面的stream类try(DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {//1.读取请求Request request = readRequest(dataInputStream);//2.根据请求计算响应Response response = process(request,clientSocket);//3.返回响应writeResponse(dataOutputStream,response);}}}catch (EOFException | SocketException e) {//如果当前是这个两个异常,代表要么文件读到末尾,要么socket已经关闭//那么此时服务器可以停止了(借助该异常)log.info("[BrokerServer] connection关闭! 客户端的地址: " + clientSocket.getInetAddress().toString() +": " + clientSocket.getPort());} catch (IOException | ClassNotFoundException e) {//如果是该异常,那么是在写读数据过程中出现了问题e.printStackTrace();log.info("[BrokerServer] 出现异常!");}finally {//关闭clientSockettry {clientSocket.close();//一个TCP连接里面有多个channel,还需要清理掉与该socket相关的所有channelclearSocketChannel(clientSocket);} catch (IOException e) {e.printStackTrace();}}}//该方法就是遍历上述哈希表,然后将里面与clientSocket相关联的所有的channel进行删除private void clearSocketChannel(Socket clientSocket) {List<String> toDelete = new LinkedList<>();for(Map.Entry<String,Socket> entry : sessions.entrySet()) {if(entry.getValue().equals(clientSocket)) {toDelete.add(entry.getKey());}}for(String channelId : toDelete) {sessions.remove(channelId);}log.info("[BrokerServer] 删除与socket相关channel成功");}private Request readRequest(DataInputStream dataInputStream) throws IOException, ClassNotFoundException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if(n != request.getLength()) {throw new IOException("[BrokerServer] 数据解析出现错误");}request.setPayload(payload);log.info("[BrokerServer] 读取请求成功 type: " + request.getType() + ",length: " + n);return request;}private Response process(Request request,Socket clientSocket) throws IOException, ClassNotFoundException {//1.首先解析一下当前请求中的payload数据BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());log.info("rid: " + basicArguments.getRid() + ",channelId: " + basicArguments.getChannelId() +",length: " + request.getLength() + ",type: " + request.getType());//2.根据请求中type的类型解析一下当前请求是要干啥int type = request.getType();boolean ok = true;if(type == 0x1) {//创建channelsessions.put(basicArguments.getChannelId(),clientSocket);log.info("[BrokerServer] 创建channel成功 channelId: " + basicArguments.getChannelId());}else if(type == 0x2) {//销毁channelsessions.remove(basicArguments.getChannelId());log.info("[BrokerServer] 销毁channel成功 channelId: " + basicArguments.getChannelId());}else if(type == 0x3) {//创建交换机ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(),arguments.getType(),arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments());log.info("[BrokerServer] 创建交换机成功 exchangeName: " + arguments.getExchangeName());}else if(type == 0x4) {//销毁交换机ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());log.info("[BrokerServer] 销毁交换机成功 exchangeName: " + arguments.getExchangeName());}else if(type == 0x5) {//创建队列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(),arguments.isDurable(),arguments.isAutoDelete(),arguments.isExclusive(),arguments.getArguments());log.info("[BrokerServer] 创建队列成功 queueName: " + arguments.getQueueName());}else if(type == 0x6) {//销毁队列QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete(arguments.getQueueName());log.info("[BrokerServer] 销毁队列成功 queueName: " + arguments.getQueueName());}else if(type == 0x7) {//创建绑定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(),arguments.getExchangeName(),arguments.getBindingKey());log.info("[BrokerServer] 创建绑定成功 queueName: " + arguments.getQueueName() +",exchangeName: " + arguments.getExchangeName() +",bindingKey: " + arguments.getBindingKey());}else if(type == 0x8) {//销毁绑定QueueUnBindArguments arguments = (QueueUnBindArguments) basicArguments;ok = virtualHost.queueUnBind(arguments.getQueueName(),arguments.getExchangeName());log.info("[BrokerServer] 销毁绑定成功 queueName: " + arguments.getQueueName() +",exchangeName: " + arguments.getExchangeName());}else if(type == 0x9) {//发送消息BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(),arguments.getRoutingKey(),arguments.getBasicProperties(),arguments.getBody());log.info("[BrokerServer] 发送消息成功 exchangeName: " + arguments.getExchangeName() +",routingKey: " + arguments.getRoutingKey());}else if(type == 0xa) {//订阅消息BasicConsumerArguments arguments = (BasicConsumerArguments) basicArguments;ok = virtualHost.basicConsumer(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {//该回调函数的用处是:当有生产者往对应队列中生产了消息,服务器就可以将当前队列中的消息推送给消费者客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {//(1)首先需要知道当前推送的消费者是谁//其实这里的channelId就是当前消费者的一个身份标识,也就是consumerTag//所以就需要从存储的哈希表(sessions)中找到当前消费者所对应的socket,进而将消息给推送回去Socket clientSocket = sessions.get(consumerTag);if(clientSocket == null) {throw new MqException("[BrokerServer] 订阅消息的客户端已断开连接");}//(2)构造payloadSubScribeReturns scribeReturns = new SubScribeReturns();scribeReturns.setBody(body);scribeReturns.setConsumerTag(consumerTag);scribeReturns.setBasicProperties(basicProperties);scribeReturns.setOk(true);//由于这里只有响应没有请求,暂时不需要要rid进行对应scribeReturns.setRid("");scribeReturns.setChannelId(consumerTag);//(3)构造响应Response response = new Response();try {byte[] payload = BinaryTool.toBytes(scribeReturns);response.setPayload(payload);response.setLength(payload.length);response.setType(0xc);//(4)写回当前响应DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream,response);} catch (IOException e) {e.printStackTrace();log.info("[BrokerServer] 序列化失败|写回响应失败");}}});}else if(type == 0xb) {//手动确认BasicAckArguments arguments = (BasicAckArguments)basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(),arguments.getMessageId());log.info("[BrokerServer] 手动确认成功 queueName: " + arguments.getQueueName() +",messageId: " + arguments.getMessageId());}else {throw new MqException("[BrokerServer] 未知的type type: " + type);}//3.构造响应并返回Response response = new Response();BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);log.info("[Response] rid: " + basicArguments.getRid() + ",channelId: " + basicArguments.getChannelId() +",type: " + response.getType() + ",length:" + response.getLength());return response;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());//记得刷新dataOutputStream.flush();}//一般来说,都是直接kill掉进程//但这个写了一个stop方法,主要是为了后续的单元测试public void stop() throws IOException {log.info("[BrokerServer] 停止");runnable = false;executorService.shutdownNow();serverSocket.close();}
}
关于SubscribeReturns响应的返回:
这个响应是在当前的某个队列中有元素并且有消费者(考虑独占的时候),才会调用basicConsume(这里就将channelId作为了consumerTag作为参数传入)中的回调方法(此时这个回调方法是服务器内部回调的执行,只是构造一个0xc的响应,消费者所设定的回调方法还是通过channel中consumer来执行),构造一个类型为0xc的响应,并且此时还会构造一个0xa的响应(用于返回basicConsume这个方法是否执行成功),而0xc这个响应在被Connection中的方法读取到后,会根据响应(SubscribeReturns)中的consumerTag(等同于channelId)来获取到对应得channel,然后让channel中得consumer来执行消费者所设定的回调方法,然后将响应放入channel的响应集合中,从而终止等待,响应返回完毕。
(4)工厂方法:
ConnectionFactory类:
用来创建一个Connecting。
具体代码实现:
@Data
public class ConnectionFactory {private String host;private int port;//访问BrokerServer中的哪个虚拟主机//这几个属性先不考虑//private String virtualHostName;//private String password;//private String userName;public Connection getNewConnection() throws IOException {Connection connection = new Connection(host,port);return connection;}
}
这里没有涉及到用户和密码的校验,由于是单机系统,而RabbitMQ是需要进行用户和密码验证的。
相关文章:
模拟实现消息队列(基于SpringBoot实现)
提要:此处的消息队列是仿照RabbitMQ实现(参数之类的),实现一些基本的操作:创建/销毁交互机(exchangeDeclare,exchangeDelete),队列(queueDeclare,…...
C语言:预编译过程的剖析
目录 一.预定义符号和#define定义常量 二.#define定义宏 三.宏和函数的对比 四、#和##运算符 五、条件编译 在之前,我们已经介绍了.c文件在运行的过程图解,大的方面要经过两个方面。 一、翻译环境 1.预处理(预编译) 2.编译 3…...
算法——单调栈
单调栈: 保持栈内的元素始终递增或递减。 单调递增 待处理数组{1,5,2,5,7,2,8} public void sameyIncrease(int[] nums) {Stack<Integer> stack new Stack<>();for(int i 0; i < nums.length; i) {//当栈空的时候可以直接进栈或者要进栈的数大于…...
LeetCode讲解篇之695. 岛屿的最大面积
文章目录 题目描述题解思路题解代码题目链接 题目描述 题解思路 我们遍历二维矩阵,如果当前格子的元素为1进行深度优先搜索,将搜索过的格子置为0,防止重复访问,然后对继续深度优先搜索上下左右中为1的格子 题解代码 func maxAr…...
招联2025校招内推倒计时
【投递方式】 直接扫下方二维码,或点击内推官网https://wecruit.hotjob.cn/SU61025e262f9d247b98e0a2c2/mc/position/campus,使用内推码 igcefb 投递) 【招聘岗位】 后台开发 前端开发 数据开发 数据运营 算法开发 技术运维 软件测试 产品策…...
vite学习教程01、vite构建vue2
文章目录 前言一、vite初始化项目二、修改配置文件2.1、修改main.js文件2.2、修改App.vue文件2.3、修改helloworld.vue2.4、修改vite.conf.js2.5、修改vue版本--修改package.json文件 三、安装vue2和vite插件四、启动服务资料获取 前言 博主介绍:✌目前全网粉丝3W&…...
强化学习部分代码的注释分析
引言 对一些代码块进行注释。我直接复制过来的,不能运行的话别怪我。 多臂赌博机 代码来自链接。欢迎回到原来的链接学习。 %I thought what Id do was Id pretend I was one of those deaf-mutes,or should I ?clear all; epsilon[0.5,0.2,0.1,0.0…...
ctf.bugku-备份是个好习惯
访问页面得到字符串 这串字符串是重复的; d41d8cd98f00b204e9800998ecf8427e 从前端、源码上看,除了这段字符串,没有其他信息;尝试解密,长度32位;各种解密方式试试; MD5免费在线解密破解_MD5在…...
C++面试速通宝典——14
220. static关键字的作用 static关键字在编程中有多种作用: 在类的成员变量前使用,表示该变量属于类本身,而不是任何类的实例。在类的成员函数前使用,表示该函数不需要对象实例即可调用,且只能访问类的静…...
k8s的简介和部署
一、k8s简介 在部署应用程序的方式上面,主要经历了三个阶段: 传统部署:互联网早期,会直接将应用程序部署在物理机上优点:简单,不需要其它技术的参与缺点:不能为应用程序定义资源使用边界,很难合理地分配计算资源&…...
Thingsboard 网关实战 modbus通信 rpc下发控制指令
我们这里说的是Thingsboard通过网关modbus通信接入设备,然后通过rpc下发指令去控制开关信号的设备,不会网关通过modbus接入设备的,可以看我之前的文章,从小白教学。 下面我们就说如何下发rpc开关信号指令 第一步.在modbus配置文…...
基于pytorch的手写数字识别
import pandas as pd import numpy as np import torch import matplotlib import matplotlib.pyplot as plt from torch.utils.data import TensorDataset, DataLoadermatplotlib.use(tkAgg)# 设置图形配置 config {"font.family": serif,"mathtext.fontset&q…...
MySQL 实验 7:索引的操作
MySQL 实验 7:索引的操作 索引是对数据表中一列或多列的值进行排序的一种结构,索引可以大大提高 MySQL 的检索速度。合理使用索引,可以大大提升 SQL 查询的性能。 索引好比是一本书前面的目录,假如我们需要从书籍查找与 xx 相关…...
为Floorp浏览器添加搜索引擎及搜索栏相关设置. 2024-10-05
Floorp浏览器开源项目地址: https://github.com/floorp-Projects/floorp/ 1.第一步 为Floorp浏览器添加搜索栏 (1.工具栏空白处 次键选择 定制工具栏 (2. 把 搜索框 拖动至工具栏 2.添加搜索引擎 以添加 搜狗搜索 为例 (1.访问 搜索引擎网址 搜狗搜索引擎 - 上网从搜狗开始 (2…...
如何设置WSL Ubuntu在Windows开机时自动启动
如何设置WSL Ubuntu在Windows开机时自动启动 步骤详解1. 创建批处理脚本2. 添加到Windows启动项 注意事项结语 在使用Windows Subsystem for Linux (WSL) 时,我们可能希望Ubuntu能够在Windows启动时自动运行。本文将介绍如何实现这一功能,让您的开发环境更加便捷。 步骤详解 …...
使用TensorBoard可视化模型
目录 TensorBoard简介 神经网络模型 可视化 轮次-损失曲线 轮次-准确率曲线 轮次-学习率曲线 迭代-评估准确率曲线 迭代-评估损失曲线 TensorBoard简介 TensorBoard是一款出色的交互式的模型可视化工具。安装TensorFlow时,会自动安装TensorBoard。如图: TensorFlow可…...
《深度学习》OpenCV 图像拼接 原理、参数解析、案例实现
目录 一、图像拼接 1、直接看案例 图1与图2展示: 合并完结果: 2、什么是图像拼接 3、图像拼接步骤 1)加载图像 2)特征点检测与描述 3)特征点匹配 4)图像配准 5)图像变换和拼接 6&am…...
Hive数仓操作(三)
一、Hive 数据库操作 1. 创建数据库 基本创建数据库命令: CREATE DATABASE bigdata;说明: 数据库会在 HDFS 中以目录的形式创建和保存,数据库名称会存储在 Hive 的元数据中。如果不指定目录,数据库将在 /user/hive/warehouse 下…...
TDSQL-C电商可视化,重塑电商决策新纪元
前言: 在数字化浪潮席卷全球的今天,电子商务行业以其独特的魅力和无限潜力,成为了推动全球经济增长的重要引擎。然而,随着业务规模的急剧扩张,海量数据的涌现给电商企业带来了前所未有的挑战与机遇。如何高效地处理、…...
翔云 OCR:发票识别与验真
在数字化时代,高效处理大量文档和数据成为企业和个人的迫切需求。翔云 OCR 作为一款强大的光学字符识别工具,在发票识别及验真方面表现出色,为我们带来了极大的便利。 一、翔云 OCR 简介 翔云 OCR 是一款基于先进的人工智能技术开发的文字识别…...
PySide6新手必看:从零开始用Python玩转Qt界面开发(附官方教程对比)
PySide6新手必看:从零开始用Python玩转Qt界面开发 在Python生态中,GUI开发一直是个让人又爱又恨的话题。当Tkinter显得过于简陋,而PyQt又面临商业授权困扰时,PySide6作为Qt官方推出的Python绑定,正成为越来越多开发者的…...
多品种小批量时代的排产革命:JVS-APS智能排产突破交付周期瓶颈
"紧急订单插入,全产线排程推倒重来"、"设备冲突、物料短缺让排产计划沦为纸上谈兵"、"明明产能充足,订单交付周期却比同行长30%"——这些困境正在困扰着越来越多的制造企业。在现代制造业中,多品种小批量生产模…...
避开深沟槽工艺的“坑”:从DLTS数据到TCAD仿真的硅光电二极管陷阱态优化实战
硅光电二极管陷阱态优化的工程实践:从DLTS表征到TCAD仿真 在半导体制造领域,深沟槽隔离(DTI)工艺虽然能有效解决器件间的串扰问题,但其引入的界面陷阱态却成为光电二极管性能提升的"隐形杀手"。工艺工程师们…...
Gemma-3-270m多场景落地:政务热线知识库问答、医疗术语解释系统
Gemma-3-270m多场景落地:政务热线知识库问答、医疗术语解释系统 1. 快速上手:部署你的第一个Gemma-3-270m服务 想要快速体验Gemma-3-270m的强大能力?通过Ollama部署只需几个简单步骤。 1.1 环境准备与模型选择 首先确保你已经安装了Ollam…...
【卷积神经网络作业实现人脸的关键点定位功能】
下面是完成这道题目的代码:import os import cv2 import numpy as np import pandas as pd import torch import torch.nn as nn from torch.utils.data import Dataset,DataLoader from torchvision import transforms import matplotlib.pyplot as plt1. 数据集定…...
GLM-OCR模型Node.js环境配置与API服务搭建全指南
GLM-OCR模型Node.js环境配置与API服务搭建全指南 你是不是也遇到过这样的场景?手头有一堆图片需要提取文字,比如扫描的文档、截图或者手机拍的照片。自己手动录入?效率太低。用现成的在线OCR工具?又担心数据安全和调用限制。特别…...
AI 开发实战:实验和试点项目怎么记录,才不会做完就散
AI 开发实战:实验和试点项目怎么记录,才不会做完就散 一、这个问题为什么值得专门拿出来做? 在 AI 工程落地里,真正拖慢团队的往往不是模型本身,而是流程和协作方式没有跟上。 围绕“实验和试点项目怎么记录࿰…...
在Ubuntu 22.04上搞定Gen6D位姿估计:从CUDA 11.8到Pytorch3D 0.7.8的完整环境搭建避坑指南
在Ubuntu 22.04上构建Gen6D位姿估计开发环境的全流程解析 计算机视觉领域的位姿估计技术正在重塑增强现实与机器人导航的边界。Gen6D作为香港大学团队开源的前沿项目,其无需CAD模型的特性为物体位姿识别提供了新思路。本文将彻底拆解Ubuntu 22.04环境下从驱动层到算…...
CYBER-VISION零号协议互联网舆情智能监测与分析系统
CYBER-VISION零号协议:构建你的互联网舆情智能监测雷达 最近和几个做市场、公关的朋友聊天,他们都在抱怨同一个问题:每天花大量时间刷新闻、看社交媒体,就为了捕捉行业动态和用户反馈,生怕错过什么重要信息。人工监测…...
STM32F407的RTC时钟不准?手把手教你用CubeMX配置LSE晶振校准(附源码)
STM32F407的RTC时钟不准?手把手教你用CubeMX配置LSE晶振校准(附源码) 在嵌入式系统开发中,实时时钟(RTC)的精度问题常常让开发者头疼。特别是使用STM32F407这类主流单片机时,即使按照官方文档配…...
