项目实战 — 消息队列(4){消息持久化}
目录
一、消息存储格式设计
🍅 1、queue_data.txt:保存消息的内容
🍅 2、queue_stat.txt:保存消息的统计信息
二、消息序列化
三、自定义异常类
四、创建MessageFileManger类
🍅 1、约定消息文件所在的目录和文件名字
🍅 2、队列的统计信息
🍅 3、创建队列对应的目录和功能
🍅 4、实现删除队列的文件和目录
🍅 5、检查队列的目录和文件是否都存在
🍅 6、把消息写入到文件中
🍅 7、删除文件中的消息
🍅 8、将硬盘中的数据加载到内存中
🍅 9、实现消息文件的垃圾回收
🎈 检测是否要进行GC
🎈 构造新目录
🎈 进行GC操作
五、测试MessageFileManager类
🍅 1、“准备工作”和“收尾工作”
🍅 2、测试创建文件是否存在
🍅 3、测试writetStat和readStat是否能够通过
🍅 4、测试sendMessage
🍅 5、测试删除消息
🍅 6、测试垃圾回收
六、小结
一、消息存储格式设计
对于消息,并不打算存储在数据库中:
(1)消息操作并不会涉及到复杂的增删改查
(2)消息的数量可能会非常多,数据库的访问效率并不高
所以,我们直接把消息存储在文件中。
那么消息要如何在文件中存储呢?
首先消息,它是依附于队列的,所以在存储的时候,就把消息按照队列的维度展开。
我们会将队列存储在和数据库同级的data目录中,在data中创建一些子目录,子目录的名字就是队列名。
然后在每个队列的子目录下面,再分配两个文件,用来存储消息。主要是以下两个文件。
🍅 1、queue_data.txt:保存消息的内容
Message是一个二进制格式的文件,包含若干个消息,每个消息都以二进制的方式存储,每个消息都由这几个部分构成:Message对象序列化之后。
Messag对象,会分别再内存和硬盘上都记录一份。内存中的衣服呢会记录offsetBegin和offsetEnd。这样就可以找到内存中的Message对象,能够找到对应的硬盘上的message对象。
关于isValid:是用来标识当前消息在文件中是否有效,为1就是有效的消息,为0就是无效的消息。当为0时就相当于逻辑上的删除消息功能。但是,随着时间的推移,消息的增多,那么该消息可能就会大部分都是无效的消息,针对这种情况,就需要对当前消息的文件,进行垃圾回收。
以下是本程序中实现的垃圾回收功能:
垃圾回收(GC):使用复制算法,针对消息数据文件中的垃圾回收进行回收。直接遍历原有的消息数据文件,把有效的数据拷贝到一个新的文件中,然后把之前的旧文件都删除掉(逻辑删除)。
作出约定(可以不按这个来),当总消息数目超过了2000,并且有效的消息数目低于总数目的50%,就出发一次垃圾回收。
对于RabbitMQ解决垃圾回收的方式如下:
如果某个消息队列中,消息很多,都是有效消息,就会导致整个消息的数据文件特别答,后续针对文件的各种操作,成本就会很高。RabbitMQ解决方案如下:
文件拆分:当单个文件长度达到一定阈值以后,就会拆分成两个文件(拆分次数越多文件就越多)
文件合并:每个单独的文件都会进行GC,如果GC之后,就会发现文件变小很多,当小到一定程度,就会和其他文件合并。
这样就会再消息特别多的时候,也能保证性能上的及时响应。
🍅 2、queue_stat.txt:保存消息的统计信息
使用这个文件,来保存消息的统计信息。
只存一行数据,文本格式。然后一行包括两列:两者使用 \t 来分割,形如2000\t500
第一列是queue_data.txt中总的消息的数目
第二列是queue_data.txt中有效消息的数目。
二、消息序列化
消息序列化:把一个对象(结构化数据)转成一个字符串/字节数组。
在序列化之后,对象的信息是不会丢失的,这样就会方便与存储和传输(在文件中存储时,只能以字符串/二进制数据的方式存储对象)。后面需要用的时候,就再反序列化。
由于消息的body是二进制数据,所以这里不会使用JSON进行序列化。
针对二进制序列化,有很多解决方案:
(1)Java标准库中提供了序列化的方案:ObjectInputStream 和 ObjectOutputStream
(2)Hession
(3)protobuffer
(4)thrift
这里使用第一种,这样就不用再引入额外的依赖。
在commen中创建一个BinaryTool,因为后面客户端还会用到这个类,所以放在公共的包中。
/*
* 序列化和反序列化
* 实现Serializable接口才能让这个对象进行序列化和反序列化
* */
public class BinaryTool {// 把一个对象序列化成一个字节数组public static byte[] toBytes(Object object) throws IOException {
// 这个流对象相当于一个边长的字节数组
// 就可以把object序列化的数组逐渐写入到byteArrayOutputStream中,然后统一转成byte[]try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
// 此处的writeObject就会把该对象进行序列化,生成二进制字节数据,就会写入到ObjectOutputStream中
// 由于ObjectOutputStream又关联到了ByteArrayOutputStream,最终结果就写入到ByteArrayOutputStream里面了objectOutputStream.writeObject(object);}
// 该操作就是把byteArrayOutputStream中持有的二进制数据取出来,转成byte[]return byteArrayOutputStream.toByteArray();}}// 把一个字节数组,反序列化成一个对象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
// 此处的readObject,就是从data的byte[]中读取数据并且进行反序列化object = objectInputStream.readObject();}}return object;}
}
三、自定义异常类
自定义一个异常类,如果是mq的业务逻辑中出的异常,就抛出这个异常类
/*
*自定义异常类
*/
public class MqException extends Exception{public MqException(String reason){super(reason);}
}
四、创建MessageFileManger类
对硬盘上的消息进行管理的类。
🍅 1、约定消息文件所在的目录和文件名字
//这里的init()方法暂时不用,只是列在这,后面可能扩展
public void init(){
// 后续扩展}// 约定的那个消息文件所在的目录和文件名// 用来获取到指定队列对应的消息文件所在的路径private String getQueueDir(String queueName){return "./data/" + queueName;}// 用来获取该队列的消息数据文件路径private String getQueueDataPath(String queueName){return getQueueDir(queueName) + "/queue_data.txt";}// 用来获取该队列列的消息统计文件路径private String getQueueStatPath(String queueName){return getQueueDir(queueName) + "/queue_stat.txt";}
🍅 2、队列的统计信息
定义一个内部类,来表示该队列的统计信息:
// 定义一个内部类,来表示该队列的统计信息static public class Stat{
// 直接定义成publicpublic int totalCount; //总消息数量public int validCount;//有效消息的数量}
然后实现消息统计的读写功能:
private Stat readStat(String queueName){
// 由于当前的消息统计文件是文本文件,直接使用scanner读取文件内容Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}private void writeStat(String queueName,Stat stat){
// 使用PrinWrite来写
// OutputStream打开文件,默认情况下,会直接把源文件清空,新数据会覆盖原数据
// 这里直接覆盖就可以了try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}
🍅 3、创建队列对应的目录和功能
// 创建队列对应的文件和目录public void createQueueFiles(String queueName) throws IOException {
// 1.创建队列对应的消息目录File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()){
// 不存在,就创建这个目录boolean ok = baseDir.mkdirs();if (!ok){throw new IOException("创建目录失败!baseDir = " + baseDir.getAbsolutePath());}}// 2.创建队列数据文件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()){boolean ok = queueDataFile.createNewFile();if (!ok){throw new IOException("创建文件失败!queueDataFile = " + queueDataFile.getAbsolutePath());}}
// 3.创建消息统计文件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()){boolean ok = queueStatFile.createNewFile();if (!ok){throw new IOException("创建文件失败!queueStatFile = " + queueStatFile.getAbsolutePath());}}// 4.给消息统计文件,设定初始值 0\t0Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName,stat);}
🍅 4、实现删除队列的文件和目录
// 实现删除队列的文件和目录public void destroyQueueFiles(String queueName) throws IOException {
// 先删除文件,在删除目录File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3){
// 有任意一个删除失败,就算整体删除失败throw new IOException("删除队列目录和文件失败!baseDir = " + baseDir.getAbsolutePath());}}
🍅 5、检查队列的目录和文件是否都存在
后续有生产者给brocker server生产消息了,这个消息就可能需要记录到文件上(取决消息是否要持久化)。但是判断是否要持久化之前,需要检查队列中的文件是否存在。
// 检查队列的目录和文件是否存在public boolean checkFileExists(String queueName){
// 判断队列的数据文件和统计文件是否都存在File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()){return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()){return false;}return true;}
🍅 6、把消息写入到文件中
把一个新的消息,放到队列对应的文件中。主要包含以下三步:
(1)检查写入的队列是否存在;
(2)把Message对象进行序列化,转成二进制的字节数组;
(3)获取到当前数据的文件长度,使用[offsetBegin,offsetEnd]。把新的Message数据,写入到队列数据文件的末尾,此时,
Message对象的offsetBegin,就是当前文件长度 +4
offsetEnd就是当前文件长度 + 4 + message自身长度
(4)写入消息到数据文件,追加到数据文件末尾
(5)更新消息统计文件
写入文件时的线程安全问题:
* 如果两个线程,是往同一个队列中写消息,此时就需要阻塞等待;
假设现在有两个线程t1,t2。如果没有加锁,那么他们的目的就是将一个message写入到104~124之间去。但是,此时可能就会导致t1计算长度以后,没有进行写文件;t2就开始计算长度了,并且执行了写文件操作,写完以后,t1才开始写,但是此时t1就不是从104写了,而是从124开始写。这样会导致queue_data多出一段。
所以这里我们就需要对队列进行加锁。
* 如果两个线程,需要往不同的队列中些消息,此时就不需要阻塞等待。
总体代码如下:
// 该方法用于把一个新的消息,放到队列对应的文件中
// queue表示要把消息写入的队列,message则是要写的消息public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
// 1、检查要写入的队列是否存在
// 如果不存在if (!checkFileExists(queue.getName())){throw new MqException("[MessageFileManager] 队列对应的文件不存在!queueName = " + queue.getName());}// 2、把Message对象进行序列化,转成二进制的字节数组byte[] messageBinary = BinaryTool.toBytes(message);// 这个锁是,当有两个对象针对同一个对象操作时,锁才会有效synchronized (queue){
// 3、先获取到当前的队列数据文件长度,使用[offsetBegin,offsetEnd]
// 把新的Message数据,写入到队列数据文件的末尾,此时Message对象的offsetBegin,就是当前文件长度+4
// offsetEnd就是当前文件长度 + 4 + message自身长度File queueDataFile = new File(getQueueDataPath(queue.getName()));
// 获取到文件的长度:queueDataFile.length();单位字节message.setOffsetBegin(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 4.写入消息到数据文件,追加到数据文件末尾try(OutputStream outputStream = new FileOutputStream(queueDataFile,true)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
// 先写当前消息的长度,占据4个字节
// writeInt()方法用于将给定的整数值作为4个字节(即32位)写入基本DataOutputStream,并且成功执行时变量计数器加4。dataOutputStream.writeInt(messageBinary.length);
// 写入消息本体dataOutputStream.write(messageBinary);}}// 5.更新消息统计文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(),stat);}}
🍅 7、删除文件中的消息
这里就是逻辑删除:将isValid设置为0.
主要分为3步:
(1)把文件中需要删除的一段数据读出来,
(2)还原回Message对象(反序列化);
(3)把isValid改为0;
(4)将上面的数据又写回到文件。
(5)更新统计文件
这里的message对象,必须要包含offsetBegin和offsetEnd。因为这里是对文件中指定的位置进行读写的(把这个随机访问)。随机访问用到的类RandomAcessFile。
关于RandomAcessFile.seek()是用于设置文件指针(相当于光标)位置,设置后,光标会从当前指针的下一位读取到或写入到。
// 删除消息的方法public void deleteMessage(MSGQueue queue,Message message) throws IOException,ClassNotFoundException{synchronized (queue){try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()),"rw")){
// 1.先从文件中读取对应的Message数据byte[] bufferSrc = new byte[(int)(message.getOffsetEnd() - message.getOffsetBegin())];randomAccessFile.seek(message.getOffsetBegin());randomAccessFile.read(bufferSrc);
// 2.把当前读出来的二进制数据,转回成Message对象Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
// 3.把isValid设置为无效diskMessage.setIsValid((byte) 0x0);
// 4.重新写入文件byte[] buffserDest = BinaryTool.toBytes(diskMessage);
// 这里还需要设置光标的位置,因为,上面的光标已经随着读出数据而发生了改变,已经走到了下一条message的offsetBegin,
// 这里为了重新写入数据到文件中,就需要将光标移到对应的位置上面randomAccessFile.seek(message.getOffsetBegin());randomAccessFile.write(buffserDest);
// 5.统计文件-1
// 因为有一条数据无效了Stat stat = readStat(queue.getName());if (stat.validCount > 0){stat.validCount -= 1;}writeStat(queue.getName(),stat);}}}
🍅 8、将硬盘中的数据加载到内存中
将数据从文件中,读取出所有的消息内容,加载到内存当中(放到一个链表中),这个方法会在程序启动的时候调用,主要又以下几步
1.读取当前消息的长度;
2.按照该长度,读取消息内容;
3.将读取到的二进制数据,反序列化回Message对象
4.判断消息对象是不是无效对象
4.将有效的Message对象插入到链表中
// 将数据从文件中,读取出所有的消息内容,加载到内存当中(放到一个链表中)
// 由于该方法实在程序启动时调用, 此时服务器还不能处理请求,不涉及线程操作文件.public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException,MqException,ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try(InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){try(DataInputStream dataInputStream = new DataInputStream(inputStream)){
// 该变量记录当前文件光标位置,初始位置为0long currentOffset = 0;
// 一个文件中包含了很多消息,这里要循环读取while (true){
// 1.读取当前消息的长度,这里可能会读到文件末尾
// reaIn()方法读到文件末尾,会抛出EOFException异常
// readInt()读取出4个字节int messageSize = dataInputStream.readInt();
// 2.按照这个长度,读取消息内容
// buffer是一个盛放消息容器,和消息的长度一般大小byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize){
// 如果不匹配,说明文件有问题,格式错乱了throw new MqException("[MessageFileManager] 文件格式错误!queueName = " + queueName);}
// 3.将读取到的二进制数据,反序列化回Message对象Message message = (Message) BinaryTool.fromBytes(buffer);// 4.判定这个消息对象是不是无效对象if(message.getIsValid() != 0x1){
// 无效数据,直接跳过
// 虽然消息是无效消息,但是offset要更新currentOffset += (4 + messageSize);continue;}
// 有效数据,则需要把这个Message对象加入到链表中,加入前还需要填写offsetBegin和offsetEnd;
// 进行offset的时候,需要知道当前光标的位置,由于当下使用的DataInputStream并不方便计算光标位置
// 因此这里手动计算文件光标位置message.setOffsetBegin(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e){System.out.println("[MessageFileManager]恢复Message数据完成");}}return messages;}
🍅 9、实现消息文件的垃圾回收
为什么要实现垃圾回收?(GC)
由于当前会不停的往消息文件中写入新消息,而且删除也只是逻辑删除(isValid),这样就可能导致消息文件越来越大,并且里面又包含了大量的无效消息。
此处的垃圾回收,使用的是复制算法:
判断,当文件中消息总数超过了2000,并且有效消息的数目不足50%时,就触发垃圾回收。然后将文件中有效的消息复制出来,单独写入到一个新的文件中,删除旧文件,使用新文件代替。
🎈 检测是否要进行GC
检查当前是否要针对该队列的消息数据文件进行GC,判断是否要GC,根据总消息数目和有效消息数目判断。
// 检查当前是否要针对该队列的消息数据文件进行GCpublic boolean checkGC(String queueName){
// 判断是否要GC,根据总消息数目和有效消息数目,这两个值都是在消息统计文件中实现的。Stat stat = readStat(queueName);if (stat.totalCount > 2000 && (double)stat.validCount / (double) stat.totalCount < 0.5){return true;}return false;}
🎈 构造新目录
构造一个新目录,放置有效的复制信息。
// 构造一个目录结构放置复制的信息private String getQueueDataNewPath(String queueName){return getQueueDir(queueName) + "/queue_data_new.txt";}
🎈 进行GC操作
执行消息数据文件的垃圾回收操作,使用复制算法完成,主要分为以下几步:
(1)创建一个新的文件queue_data_new.txt
(2)从旧文件中读取出所有的有效消息对象
(3)把有效消息写入到queue_data_new.txt
(4)删出旧的文件,并把新的文件重命名(queue_data_new.txt => queue_data.txt)
(5)更新统计文件
public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
// 进行gc的时候,是针对消息数据文件作出整体性的一个操作,在这个过程中,
// 进行加锁操作,让其他线程不能对该队列的消息文件作出任何修改synchronized (queue) {// 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间.long gcBegin = System.currentTimeMillis();// 1. 创建一个新的文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if (queueDataNewFile.exists()) {// 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName=" + queue.getName());}boolean ok = queueDataNewFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());}// 2. 从旧的文件中, 读取出所有的有效消息对象了. (这个逻辑直接调用上述方法即可, 不必重新写了)LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());// 3. 把有效消息, 写入到新的文件中.try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer = BinaryTool.toBytes(message);// 先写四个字节消息的长度dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}// 4. 删除旧的数据文件, 并且把新的文件进行重命名File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if (!ok) {throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 把 queue_data_new.txt => queue_data.txtok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 5. 更新统计文件Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(), stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time="+ (gcEnd - gcBegin) + "ms");}}
五、测试MessageFileManager类
🍅 1、“准备工作”和“收尾工作”
创建MessagefileManagerTests
@SpringBootTest
public class MessageFileManagerTests {private MessageFileManger messageFileManger = new MessageFileManger();private static final String queueName1 = "testQueue1";private static final String queueName2 = "testQueue2";// 每个用例执行之前的准备工作@BeforeEachpublic void setUp() throws IOException {
// 准备阶段,创建出两个队列,以备后用messageFileManger.createQueueFiles(queueName1);messageFileManger.createQueueFiles(queueName2);}// 每个用例执行之后的收尾工作@AfterEachpublic void tearDown() throws IOException {
// 收尾阶段,把创建出的队列销毁掉messageFileManger.destroyQueueFiles(queueName1);messageFileManger.destroyQueueFiles(queueName2);}
}
🍅 2、测试创建文件是否存在
@Testpublic void testCreateFiles(){
// 创建队列文件在准备工作已经执行过了,这里主要是为了验证文件是否存在File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");
// assertEquals(预期值,实际值)Assertions.assertEquals(true,queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");Assertions.assertEquals(true,queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");Assertions.assertEquals(true,queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");Assertions.assertEquals(true,queueStatFile2.isFile());}
这里为了方便查看文件是否创建,就把收尾工作注释掉了
🍅 3、测试writetStat和readStat是否能够通过
@Testpublic void testReadWriteStat(){MessageFileManger.Stat stat = new MessageFileManger.Stat();stat.totalCount = 100;stat.validCount =50;// 由于writeStat和readStat是私有方法,此处就需要使用反射的方式
// 使用Spring封装好的反射的工具类ReflectionTestUtils.invokeMethod(messageFileManger,"writeStat", queueName1,stat);// 写入完毕之后,调用读取,验证读取的结果和写入的数据是一致的MessageFileManger.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManger,"readStat",queueName1);Assertions.assertEquals(100,newStat.totalCount);Assertions.assertEquals(50,newStat.validCount);System.out.println("writetStat和readStat测试通过");}
🍅 4、测试sendMessage
构造创建queue和message的方法:
private MSGQueue createTestQueue(String queueName){MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true); //是否要持久化return queue;}
// 构造出一条消息private Message createTestMessage(String content){Message message = Message.createMessageWithId("testRoutingKey",null,content.getBytes());return message;}
测试sendMessage:
@Testpublic void testSendMessage() throws IOException, MqException, ClassNotFoundException {
// 构造出消息,并且构造出队列Message message = createTestMessage("testMessage");
// 创建queue对象MSGQueue queue = createTestQueue(queueName1);// 调用发送消息的方法messageFileManger.sendMessage(queue,message);// 检查stat文件MessageFileManger.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManger,"readStat",queueName1);Assertions.assertEquals(1,stat.totalCount);Assertions.assertEquals(1,stat.validCount);// 检查文件,把消息读出来LinkedList<Message> messages = messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(1,messages.size());Message curMessage = messages.get(0);Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());Assertions.assertEquals(message.getDeliverMode(),curMessage.getDeliverMode());// 比较两个字节数组的内容是否相同,不能直接使用asserEqualsAssertions.assertArrayEquals(message.getBody(),curMessage.getBody());System.out.println("message = "+ curMessage);}
构造100条消息, 并且读取出来
@Testpublic void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {
// 往队列中插入100条消息,验证100条消息从文件中读取之后,是否和最初是一致的MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessge" + 1);messageFileManger.sendMessage(queue,message);expectedMessages.add(message);}// 读取所有消息LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(expectedMessages.size(),actualMessages.size());for (int i = 0; i < expectedMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "]actualMessage = " + actualMessages);Assertions.assertEquals(expectedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(),actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());}}
🍅 5、测试删除消息
// 测试删除消息@Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {// 创建队列, 写入 10 个消息. 删除其中的几个消息. 再把所有消息读取出来, 判定是否符合预期.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 删除其中的三个消息messageFileManager.deleteMessage(queue, expectedMessages.get(7));messageFileManager.deleteMessage(queue, expectedMessages.get(8));messageFileManager.deleteMessage(queue, expectedMessages.get(9));// 对比这里的内容是否正确.LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(7, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}
🍅 6、测试垃圾回收
@Testpublic void testGC() throws IOException, MqException, ClassNotFoundException {// 先往队列中写 100 个消息. 获取到文件大小.// 再把 100 个消息中的一半, 都给删除掉(比如把下标为偶数的消息都删除)// 再手动调用 gc 方法, 检测得到的新的文件的大小是否比之前缩小了.MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 获取 gc 前的文件大小File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long beforeGCLength = beforeGCFile.length();// 删除偶数下标的消息for (int i = 0; i < 100; i += 2) {messageFileManager.deleteMessage(queue, expectedMessages.get(i));}// 手动调用 gcmessageFileManager.gc(queue);// 重新读取文件, 验证新的文件的内容是不是和之前的内容匹配LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {// 把之前消息偶数下标的删了, 剩下的就是奇数下标的元素了.// actual 中的 0 对应 expected 的 1// actual 中的 1 对应 expected 的 3// actual 中的 2 对应 expected 的 5// actual 中的 i 对应 expected 的 2 * i + 1Message expectedMessage = expectedMessages.get(2 * i + 1);Message actualMessage = actualMessages.get(i);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}// 获取新的文件的大小File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");long afterGCLength = afterGCFile.length();System.out.println("before: " + beforeGCLength);System.out.println("after: " + afterGCLength);Assertions.assertTrue(beforeGCLength > afterGCLength);}
六、小结
MessageFileManager主要是负责管理消息在文件中的存储:
(1)设计了目录结构和文件格式
(2)实现了目录创建和删除
(3)实现了统计文件的读写
(4)实现了消息的写入
(5)实现了消息的删除
(6)实现了加载所有消息
(7)垃圾回收
相关文章:

项目实战 — 消息队列(4){消息持久化}
目录 一、消息存储格式设计 🍅 1、queue_data.txt:保存消息的内容 🍅 2、queue_stat.txt:保存消息的统计信息 二、消息序列化 三、自定义异常类 四、创建MessageFileManger类 🍅 1、约定消息文件所在的目录和文件名…...

AI编程工具Copilot与Codeium的实测对比
csdn原创谢绝转载 简介 现在没有AI编程工具,效率会打一个折扣,如果还没有,赶紧装起来. GitHub Copilot是OpenAi与github等共同开发的的AI辅助编程工具,基于ChatGPT驱动,功能强大,这个没人怀疑…...

webpack基础知识六:说说webpack的热更新是如何做到的?原理是什么?
一、是什么 HMR全称 Hot Module Replacement,可以理解为模块热替换,指在应用程序运行过程中,替换、添加、删除模块,而无需重新刷新整个应用 例如,我们在应用运行过程中修改了某个模块,通过自动刷新会导致…...

Linux从安装到实战 常用命令 Bash常用功能 用户和组管理
1.0初识Linux 1.1虚拟机介绍 1.2VMware Workstation虚拟化软件 下载CentOS; 1.3远程链接Linux系统 &FinalShell 链接finalshell半天没连接进去 他说ip adress 看IP地址是在虚拟机上 win11主机是 终端输入: ifconfig VMware虚拟机的设置 & ssh连接_snge…...

webpack基础知识三:说说webpack中常见的Loader?解决了什么问题?
一、是什么 loader 用于对模块的"源代码"进行转换,在 import 或"加载"模块时预处理文件 webpack做的事情,仅仅是分析出各种模块的依赖关系,然后形成资源列表,最终打包生成到指定的文件中。如下图所示&#…...

深度学习:Pytorch常见损失函数Loss简介
深度学习:Pytorch常见损失函数Loss简介 L1 LossMSE LossSmoothL1 LossCrossEntropy LossFocal Loss 此篇博客主要对深度学习中常用的损失函数进行介绍,并结合Pytorch的函数进行分析,讲解其用法。 L1 Loss L1 Loss计算预测值和真值的平均绝对…...

【Android-java】Parcelable 是什么?
Parcelable 是 Android 中的一个接口,用于实现将对象序列化为字节流的功能,以便在不同组件之间传递。与 Java 的 Serializable 接口不同,Parcelable 的性能更高,适用于 Android 平台。 要实现 Parcelable 接口,我们需…...

Spring整合MyBatis小实例(转账功能)
实现步骤 一,引入依赖 <!--仓库--><repositories><!--spring里程碑版本的仓库--><repository><id>repository.spring.milestone</id><name>Spring Milestone Repository</name><url>https://repo.spring.i…...

List集合的对象传输的两种方式
说明:在一些特定的情况,我们需要把对象中的List集合属性存入到数据库中,之后把该字段取出来转为List集合的对象使用(如下图) 自定义对象 public class User implements Serializable {/*** ID*/private Integer id;/*…...

海外媒体发稿:软文写作方法方式?一篇好的软文理应合理规划?
不同种类的软文会有不同的方式,下面小编就来来给大家分析一下: 方法一、要选定文章的突破点: 所说突破点就是这篇文章文章软文理应以什么样的视角、什么样的见解、什么样的语言设计理念、如何文章文章的标题来写。不同种类的传播效果&#…...

【秋招】算法岗的八股文之机器学习
目录 机器学习特征工程常见的计算模型总览线性回归模型与逻辑回归模型线性回归模型逻辑回归模型区别 朴素贝叶斯分类器模型 (Naive Bayes)决策树模型随机森林模型支持向量机模型 (Support Vector Machine)K近邻模型神经网络模型卷积神经网络(CNN)循环神经…...

为什么list.sort()比Stream().sorted()更快?
真的更好吗? 先简单写个demo List<Integer> userList new ArrayList<>();Random rand new Random();for (int i 0; i < 10000 ; i) {userList.add(rand.nextInt(1000));}List<Integer> userList2 new ArrayList<>();userList2.add…...

SQL账户SA登录失败,提示错误:18456
错误代码 18456 表示 SQL Server 登录失败。这个错误通常表示提供的凭据(用户名和密码)无法成功验证或者没有权限访问所请求的数据库。以下是一些常见的可能原因和解决方法: 1.错误的凭据:请确认提供的SA账户的用户名和密码是否正…...

Linux 终端操作命令(1)
Linux 命令 终端命令格式 command [-options] [parameter] 说明: command:命令名,相应功能的英文单词或单词的缩写[-options]:选项,可用来对命令进行控制,也可以省略parameter:传给命令的参…...

java与javaw运行jar程序
运行jar程序 一、java.exe启动jar程序 (会显示console黑窗口) 1、一般用法: java -jar myJar.jar2、重命名进程名称启动: echo off copy "%JAVA_HOME%\bin\java.exe" "%JAVA_HOME%\bin\myProcess.exe" myProcess -jar myJar.jar e…...

安装和配置 Home Assistant 教程 HACS Homkit 米家等智能设备接入
安装和配置 Home Assistant 教程 简介 Home Assistant 是一款开源的智能家居自动化平台,可以帮助你集成和控制各种智能设备,从灯光到温度调节器,从摄像头到媒体播放器。本教程将引导你如何在 Docker 环境中安装和配置 Home Assistant&#…...

解决 Android Studio 的 Gradle 面板上只有关于测试的 task 的问题
文章目录 问题描述解决办法 笔者出问题时的运行环境: Android Studio Flamingo | 2022.2.1 Android SDK 33 Gradle 8.0.1 JDK 17 问题描述 笔者最近发现一个奇怪的事情。笔者的 Android Studio 的 Gradle 面板上居然除了用于测试的 task 之外,其它什…...

安全杂记 - 复现nodejs沙箱绕过
目录 一. 配置环境1.下载nodejs2.nodejs配置3.报错解决方法 二. nodej沙箱绕过1. vm模块2.使用this或引用类型来进行沙箱绕过 一. 配置环境 1.下载nodejs 官网:https://nodejs.org/en2.nodejs配置 安装nodejs的msi文件,默认配置一直下一步即可&#x…...

信息安全事件分类分级指南
范围 本指导性技术文件为信息安全事件的分类分级提供指导,用于信息安全事件的防范与处置,为事前准备、事中应对、事后处理 提供一个基础指南,可供信息系统和基础信息传输网络的运营和使用单位以及信息安全主管部门参考使用。 术语和定义 下…...

Vue系列第八篇:echarts绘制柱状图和折线图
本篇将使用echarts框架进行柱状图和折线图绘制。 目录 1.绘制效果 2.安装echarts 3.前端代码 4.后端代码 1.绘制效果 2.安装echarts // 安装echarts版本4 npm i -D echarts4 3.前端代码 src/api/api.js //业务服务调用接口封装import service from ../service.js //npm …...

SQL-每日一题【1164. 指定日期的产品价格】
题目 产品数据表: Products 写一段 SQL来查找在 2019-08-16 时全部产品的价格,假设所有产品在修改前的价格都是 10 。 以 任意顺序 返回结果表。 查询结果格式如下例所示。 示例 1: 解题思路 1.题目要求我们查找在 2019-08-16 时全部产品的价格,假设所…...

memcpy、memmove、memcmp、memset函数的作用与区别
一、memcpy与memmove 1、memcpy 作用:从source的位置开始向后复制num个字节的数据到destination的内存位置。 注意: memcpy() 函数在遇到 ’\0’ 的时候不会停下来(strcpy字符串拷贝函数在遇到’\0’的时候会停下来);destination和source…...

socket 到底是个啥
我相信大家在面试过程中或多或少都会被问到这样一个问题:你能解释一下什么是 socket 吗 我记得我当初的回答很是浅显:socket 也叫套接字,用来负责不同主机程序之间的网络通信连接,socket 的表现方式由四元组(ip地址&am…...

奥威BI—数字化转型首选,以数据驱动企业发展
奥威BI系统BI方案可以迅速构建企业级大数据分析平台,可以将大量数据转化为直观、易于理解的图表和图形,推动和促进数字化转型的进程,帮助企业更好地了解自身的运营状况,及时发现问题并采取相应的措施,提高运营效率和质…...

vue中swiper使用
1.引包 说明:导入相应js引css import "Swiper" from "swiper" import "swiper/css/swiper.css"; import "swiper/js/swiper"; 2.结构 说明:必要的结构使用;直接封装成一个组件 <template>…...

webpack与vite区别
webpack和Vite作为两种常用的前端构建工具,主要有以下几点区别: 构建速度 webpack采用“打包”的方式构建,需要将所有模块打包成几个大的bundle文件,构建速度较慢。 Vite采用了“按需编译”的方式,只在浏览器请求时才编译对应模块,启动速度更快。 dev server webpack dev s…...

GLSL用于图像处理
Pipeline 硬件处理顶点和片段的Pipeline 软件的输入 顶点着色器 顶点的glsl 输入–特殊全局变量 变量 类型 指定函数 描述 gl_ Vertex vec4 glVertex 顶点的全局空间坐标 gl_Color vec4 glColor 主颜色值 gl_SecondaryColor vec4 glSecondaryColor 辅助颜色值 gl_Normal …...

即将发布的 Kibana 版本可运行 Node.js 18
作者:Thomas Watson Kibana 构建在 Node.js 框架之上。 为了确保每个 Kibana 版本的稳定性和使用寿命,我们始终将捆绑的 Node.js 二进制文件保持为最新的最新长期支持 (LTS) 版本。 当 Node.js 版本 18 升级到 LTS 时,我们开始将 Kibana 升级…...

基于遗传算法改进的支持向量机多分类仿真,基于GA-SVM的多分类预测,支持相机的详细原理
目录 背影 支持向量机SVM的详细原理 SVM的定义 SVM理论 遗传算法的原理及步骤 SVM应用实例,基于遗传算法优化SVM的多分类预测 完整代码包括SVM工具箱:https://download.csdn.net/download/abc991835105/88175549 代码 结果分析 展望 背影 多分类预测对现代智能化社会拥有重…...

MySQL5.7源码编译Debug版本
编译环境Ubuntu22.04LTS 1 官方下载MySQL源码 https://dev.mysql.com/downloads/mysql/?spma2c6h.12873639.article-detail.4.68e61a14ghILh5 2 安装基础软件 cmakeclangpkg-configperl 参考:https://dev.mysql.com/doc/refman/5.7/en/source-installation-prere…...