RocketMQ源码(24)—DefaultMQPushConsumer延迟消息源码
基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer延迟消息源码。
文章目录
- 1 load加载延迟消息数据
- 1.1 parseDelayLevel解析延迟等级
- 2 start启动调度消息服务
- 3 DeliverDelayedMessageTimerTask投递延迟消息任务
- 3.1 executeOnTimeup执行延迟消息投递
- 3.2 scheduleNextTimerTask下一个调度任务
- 3.3 correctDeliverTimestamp校验投递时间
- 3.4 messageTimeup恢复正常消息
- 3.5 syncDeliver同步投递消息
- 4 延迟消息的总结
并发消息消费失败引发消费重试时,默认情况下重试16次,从延迟等级level3(10s)开始,每次延迟时间递增,时间到了又会发送到重试topic去消费,这其中就涉及到RocketMQ的延迟消息,可以说RocketMQ并发消息消费失败引发消费重试就是基于topic替换和延迟消息这两个技术实现的。
此前我们学习了RocketMQ的消费重试,我们知道在判断消息为延迟消息的时候,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id, id = level - 1,如果延迟级别大于最大级别,则设置为最大级别18,,默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复。
实际上普通的延迟消息也会进行topic替换,那么,发送到SCHEDULE_TOPIC_XXXX对应的消息队列里面的延迟消息,到底是做到能够在给定的延迟时间之后取出来重新投递的呢?下面我们来看看RocketMQ延迟消息的源码。
实际上RocketMQ通过ScheduleMessageService调度消息服务实现延迟(定时)消息。ScheduleMessageService继承了ConfigManager,在DefaultMessageStore实例化的时候被实例化。
1 load加载延迟消息数据
在broker启动执行DefaultMessageStore#load方法加载Commit Log、Consume Queue、index file等文件,将数据加载到内存中,并完成数据的恢复的时候,同样会执行ScheduleMessageService#load方法,加载延迟消息数据,初始化delayLevelTable和offsetTable。
首先调用父类的ConfigManager#load方法(在broker启动部分就讲过源码了),将延迟消息文件${user.home}/store/config/delayOffset.json加载到内存的offsetTable集合中,delayOffset.json中保存着延迟topic每个队列的消费进度(消费偏移量)。
/*** ScheduleMessageService的方法* <p>* 加载延迟消息数据,初始化delayLevelTable和offsetTable*/
@Override
public boolean load() {//调用父类ConfigManager#load方法,将延迟消息文件${user.home}/store/config/delayOffset.json加载到内存的offsetTable集合中//delayOffset.json中保存着延迟topic每个队列的消费进度(消费偏移量)boolean result = super.load();//解析延迟级别到delayLevelTable集合中result = result && this.parseDelayLevel();//矫正每个延迟队列的偏移量result = result && this.correctDelayOffset();return result;
}
/*** ScheduleMessageService的方法* <p>* 获取延迟消息文件路径${user.home}/store/config/delayOffset.json*/
@Override
public String configFilePath() {//${user.home}/store/config/delayOffset.jsonreturn StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
}/*** ScheduleMessageService的方法* <p>* json字符串转换为offsetTable对象*/
@Override
public void decode(String jsonString) {if (jsonString != null) {DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);if (delayOffsetSerializeWrapper != null) {this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());}}
}
延迟消息文件${user.home}/store/config/delayOffset.json的内容,它保存着延迟队列对应的消费偏移量。
{"offsetTable":{3:2,4:1}
}
1.1 parseDelayLevel解析延迟等级
延迟等级字符串存储在MessageStoreConfig的messageDelayLevel属性中,默认值为"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",即18个等级,因此也是可以配置的,但是单位仅支持s、m、h、d,分别表示秒、分、时、天。
该方法解析延迟等级以及对应的延迟时间到delayLevelTable中,单位统一转换为毫秒,注意延迟等级从1开始。
/*** ScheduleMessageService的方法* 解析延迟等级到delayLevelTable中* @return*/
public boolean parseDelayLevel() {//时间单位表HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();timeUnitTable.put("s", 1000L);timeUnitTable.put("m", 1000L * 60);timeUnitTable.put("h", 1000L * 60 * 60);timeUnitTable.put("d", 1000L * 60 * 60 * 24);//从MessageStoreConfig中获取延迟等级字符串messageDelayLevelString levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();try {//通过空格拆分String[] levelArray = levelString.split(" ");for (int i = 0; i < levelArray.length; i++) {//获取每个等级的延迟时间String value = levelArray[i];//获取延迟单位String ch = value.substring(value.length() - 1);//获取对应的延迟单位的时间毫秒Long tu = timeUnitTable.get(ch);//延迟等级,从1开始int level = i + 1;//如果当前等级已经大于最大等级,则赋值为最大等级if (level > this.maxDelayLevel) {this.maxDelayLevel = level;}//延迟时间long num = Long.parseLong(value.substring(0, value.length() - 1));//计算该等级的延迟时间毫秒long delayTimeMillis = tu * num;//存入delayLevelTable中this.delayLevelTable.put(level, delayTimeMillis);if (this.enableAsyncDeliver) {this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());}}} catch (Exception e) {log.error("parseDelayLevel exception", e);log.info("levelString String = {}", levelString);return false;}return true;
}
2 start启动调度消息服务
ScheduleMessageService依靠内部的定时任务实现延迟消息,ScheduleMessageService通过start方法完成启动。
在broker的启动过程中,会执行DefaultMessageStore的start方法中,该方法内部通过handleScheduleMessageService方法执行ScheduleMessageService的start方法。
该方法的大概逻辑为:
- 初始化延迟消息投递线程池deliverExecutorService,该线程池是一个调度任务线程池ScheduledThreadPoolExecutor,核心线程数就是最大的延迟等级,默认18。
- 遍历所有的延迟等级,为每一个延迟等级构建一个对应的DeliverDelayedMessageTimerTask调度任务放到deliverExecutorService中,默认延迟1000ms后执行。
- 构建一个延迟队列消费偏移量持久化的定时调度任务,首次延迟1000ms之后执行,后续每次执行间隔flushDelayOffsetInterval时间,默认10s。
/*** ScheduleMessageService的方法* <p>* 启动调度消息服务*/
public void start() {//将启动标志CAS的从false改为true,该服务只能启动一次if (started.compareAndSet(false, true)) {//调用父类的load方法,将延迟消息文件${user.home}/store/config/delayOffset.json加载到内存的offsetTable集合中//fix(dledger): reload the delay offset when master changed (#2518)super.load();/** 1 初始化延迟消息投递线程池,核心线程数就是最大的延迟等级,默认18*/this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));//异步投递,默认不支持if (this.enableAsyncDeliver) {this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));}/** 2 对所有的延迟等级构建一个对应的DeliverDelayedMessageTimerTask调度任务,默认延迟1000ms后执行*/for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {//延迟等级Integer level = entry.getKey();//延迟时间,毫秒Long timeDelay = entry.getValue();//根据延迟等级获取对应的延迟队列的消费偏移量,如果没有则设置为0Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}//延迟时间不为null,那么为该等级的延迟队列构建一个DeliverDelayedMessageTimerTask调度任务,默认延迟1000ms后执行if (timeDelay != null) {if (this.enableAsyncDeliver) {this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}//DeliverDelayedMessageTimerTask构造参数包括对应的延迟等级,以及最新消费偏移量this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}}/** 3 构建一个延迟队列消费偏移量持久化的定时调度任务,首次延迟1000ms之后执行,后续每次执行间隔flushDelayOffsetInterval时间,默认10s*/this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {if (started.get()) {ScheduleMessageService.this.persist();}} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);}
}
3 DeliverDelayedMessageTimerTask投递延迟消息任务
在start方法中,ScheduleMessageService会为每一个延迟等级创建一个DeliverDelayedMessageTimerTask投递延迟消息任务,不同延迟等级的消息放到不同的延迟队列里面,被不同的Task处理。
采用不同的队列处理同一个延迟等级的消息的方式,不再需要进行消息排序,避免了消息排序的复杂逻辑,能比较简单的实现有限等级的延迟消息,RocketMQ的开源版本不支持任意时间的延迟消息,这也是它的一个限制吧!
DeliverDelayedMessageTimerTask是一个线程任务,下面来看看它的run方法,主要是调用executeOnTimeup执行消息投递。
@Override
public void run() {try {//如果服务已启动,那么继续执行if (isStarted()) {//执行消息投递this.executeOnTimeup();}} catch (Exception e) {// XXX: warn and notify melog.error("ScheduleMessageService, executeOnTimeup exception", e);//抛出异常,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,10000ms后执行,本次任务结束this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);}
}
3.1 executeOnTimeup执行延迟消息投递
延迟消息的核心逻辑实现,执行延迟消息消息投递。
- 调用findConsumeQueue方法,根据topic和延迟队列id从consumeQueueTable查找需要写入的ConsumeQueue,如果没找到就新建,即ConsumeQueue文件是延迟创建的。该方法的源码我们在ReputMessageService异步构建ConsumeQueue和IndexFile部分已经讲过了。
- 如果没找到对应的消息队列,调用scheduleNextTimerTask方法,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束。
- 调用getIndexBuffer方法,根据逻辑offset定位到物理偏移量,然后截取该偏移量之后的一段Buffer,其包含要拉取的消息的索引数据及对应consumeQueue文件之后的全部索引数据,即这里截取的Buffer可能包含多条索引数据。该方法的源码我们在broker处理拉取消息请求部分已经讲过了。
- 遍历缓存buffer中的消息,根据tagsCode投递时间判断消息是否到期,如果到期则回复真实消息并且投递安到真实topic以及对应的queueId中。
- 获取该条目对应的消息的tagsCode,对于延迟消息,tagsCode被替换为延迟消息的发送时间(在CommitLog#checkMessageAndReturnSize方法中,源码此前讲过了)。
- 如果投递时间小于当前时间,那么可以投递该延迟消息。如果投递时间大于当前时间,那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束。
- 根据消息物理偏移量从commitLog中找到该条消息,调用messageTimeup方法构建内部消息对象,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId。即恢复为正常消息。
- 最后调用syncDeliver方法投递该消息,消息将会被投递到原始topic和队列中,这样就可以被消费了。
- 遍历结束,更新下一个offset,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束。保证线程任务的活性。
/*** DeliverDelayedMessageTimerTask的方法* <p>* 执行延迟消息消息投递*/
public void executeOnTimeup() {/** 1 根据topic和延迟队列id从consumeQueueTable查找需要写入的ConsumeQueue,如果没找到就新建,即ConsumeQueue文件是延迟创建的。* 该方法的源码我们在ReputMessageService异步构建ConsumeQueue和IndexFile部分已经讲过了*/ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));/** 2 如果没找到对应的消息队列,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束*/if (cq == null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}/** 3 根据逻辑offset定位到物理偏移量,然后截取该偏移量之后的一段Buffer,其包含要拉取的消息的索引数据及对应consumeQueue文件之后的全部索引数据。* 这里截取的Buffer可能包含多条索引数据,因为需要批量拉取多条消息,以及进行消息过滤。* 该方法的源码我们在broker处理拉取消息请求部分已经讲过了*/SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);//没获取到缓存bufferif (bufferCQ == null) {long resetOffset;//如果当前消息队列的最小偏移量 大于 当前偏移量,那么当前偏移量无效,设置新的offset为最小偏移量if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());}//如果当前消息队列的最大偏移量 小于 当前偏移量,那么当前偏移量无效,设置新的offset为最大偏移量else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());} else {resetOffset = this.offset;}//新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);return;}/** 3 遍历缓存buffer中的消息,根据tagsCode投递时间判断消息是否到期,如果到期则回复真实消息并且投递安到真实topic以及对应的queueId中*///下一个消费的offsetlong nextOffset = this.offset;try {//i表示consumeQueue消息索引大小int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();//遍历截取的Buffer中的consumeQueue消息索引,固定长度20bfor (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//获取该条目对应的消息在commitlog文件中的物理偏移量long offsetPy = bufferCQ.getByteBuffer().getLong();//获取该条目对应的消息在commitlog文件中的总长度int sizePy = bufferCQ.getByteBuffer().getInt();//获取该条目对应的消息的tagsCode,对于延迟消息,tagsCode被替换为延迟消息的发送时间(CommitLog#checkMessageAndReturnSize方法中)long tagsCode = bufferCQ.getByteBuffer().getLong();//如果tagsCode是扩展文件地址if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}//当前时间戳long now = System.currentTimeMillis();//校验投递时间,必须小于等于当前时间 + 延迟时间long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);//计算下一个offsetnextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);//如果投递时间大于当前时间,那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束long countdown = deliverTimestamp - now;if (countdown > 0) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}//根据消息物理偏移量从commitLog中找到该条消息。MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt == null) {continue;}//构建内部消息对象,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId。即恢复为正常消息MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}boolean deliverSuc;/** 消息投递*/if (ScheduleMessageService.this.enableAsyncDeliver) {//异步投递,默认不支持deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);} else {//默认同步投递deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);}//如果投递失败,那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束if (!deliverSuc) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}}//遍历结束,更新下一个offsetnextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);} catch (Exception e) {log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);} finally {//释放内存bufferCQ.release();}/** 4 新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束** 保证线程任务的活性*/this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
3.2 scheduleNextTimerTask下一个调度任务
如果没找到对应的消息队列,或者没找到缓存buffer,或者没有过期的消息,获取投递失败等原因,将会新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束。
/*** DeliverDelayedMessageTimerTask的方法* <p>* 新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束** @param offset 消费偏移量* @param delay 延迟时间*/
public void scheduleNextTimerTask(long offset, long delay) {ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
}
3.3 correctDeliverTimestamp校验投递时间
校验投递时间,要求投递时间最晚不大于保证投递时间小于等于当前时间 + 延迟时间。
/*** DeliverDelayedMessageTimerTask的方法* <p>* 校验投递时间*/
private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {//投递时间戳long result = deliverTimestamp;//当前时间 + 延迟时间long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);//保证投递时间小于等于当前时间 + 延迟时间if (deliverTimestamp > maxTimestamp) {result = now;}return result;
}
3.4 messageTimeup恢复正常消息
构建一个MessageExtBrokerInner,恢复为正常消息。设置topic的值为REAL_TOPIC属性值,这是原始topic,可能是重试topic或者真实topic。设置queueId的值为REAL_QID属性值,这是原始queueId,可能是重试queueId或者真实queueId。
/*** DeliverDelayedMessageTimerTask的方法* <p>* 还原原始消息,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId。即恢复为正常消息** @param msgExt 延迟消息* @return 真实消息*/
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {//构建MessageExtBrokerInner对象MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());//延迟消息的tagsCode为投递时间,现在来计算真正的tagsCodeValuelong tagsCodeValue =MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());msgInner.setTagsCode(tagsCodeValue);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());//不需要等待存储完成后才返回msgInner.setWaitStoreMsgOK(false);MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);//设置topic的值为REAL_TOPIC属性值,这是原始topic,可能是重试topic或者真实topicmsgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));//设置queueId的值为REAL_QID属性值,这是原始queueId,可能是重试queueId或者真实queueIdString queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);int queueId = Integer.parseInt(queueIdStr);msgInner.setQueueId(queueId);return msgInner;
}
3.5 syncDeliver同步投递消息
syncDeliver内部调用asyncPutMessage方法同步投递消息,投递完毕之后,更新offsetTable中的对应延迟队列的消费偏移量。
这里有个bug,第三个参数应该使用当前偏移量,而不是最开始的偏移量,在4.9.4版本已经修复。
/*** DeliverDelayedMessageTimerTask的方法* 同步投递* 这里有个bug,第三个参数应该使用当前偏移量,而不是最开始的偏移量,在4.9.4版本已经修复** @param msgInner 内部消息对象* @param msgId 消息id* @param offset 当前消费偏移量* @param offsetPy 消息物理偏移量* @param sizePy 消息大小* @return*/
private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,int sizePy) {//投递消息,内部调用asyncPutMessage方法PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);//投递结果PutMessageResult result = resultProcess.get();boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;if (sendStatus) {//如果发送成功,那么更新offsetTable中的消费偏移量ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());}return sendStatus;
}
4 延迟消息的总结
投递的消息在broker处理过成功,会判断如果是延迟消息,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id, id = level - 1,如果延迟级别大于最大级别,则设置为最大级别18,默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。
最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复。
而在RocketMQ端,通过ScheduleMessageService调度消息服务处理投递到SCHEDULE_TOPIC_XXXX的延迟消息。
从源码中可以看到,每个延迟级别都有一个线程专门处理该级别的延迟消息,这样避免了消息的排序,具体的处理逻辑其被封装为一个DeliverDelayedMessageTimerTask线程任务。
不同延迟级别的处理线程将会从各自对应的延迟队列中获取延迟消息,然后和当前时间比较看消息是否过期,如果消息过期,那么构造一个新的消息,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId,即恢复为正常消息然后再次进行投递。
另外,延迟消息的consumeQueue条目中的tagsCode并不是tag的hashCOde,而是该条消息的到期时间。
RocketMQ在内部通过内部topic替换实现延迟消息,非常巧妙,而且并发消息重试也是使用了延迟消息。
相关文章:
RocketMQ源码(24)—DefaultMQPushConsumer延迟消息源码
基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer延迟消息源码。 文章目录1 load加载延迟消息数据1.1 parseDelayLevel解析延迟等级2 start启动调度消息服务3 DeliverDelayedMessageTimerTask投递延迟消息任务3.1 executeOnTimeup执行延迟消息投递3.2…...

计算机视觉知识点(一)——交并比(IoU)及其若干改进
交并比(IoU)前言IoU公式及示意图IoU Loss缺点GIoU Loss公式及示意图缺点DIoU公式及示意图CIoU前言 目标检测是一个常见的计算机视觉任务,在目标检测任务中,交并比作为评判检测框的标准具有很重要的意义,在实际的应用中…...

一篇文章教你从零到一搭建自动化测试框架(附视频教程+源码)
目录 前言 1. 什么是自动化测试框架? 2. 没有万能的测试框架,适合自己项目的,能提高工作效率的就是好框架。 3. 设计框架的思路: 4.如何开展自动化测试 前言 关于测试框架的好处,比如快速回归提高测试效率&#x…...

【备战蓝桥杯】----01背包问题(动态规划)
🌹作者:云小逸 📝个人主页:云小逸的主页 📝Github:云小逸的Github 🤟motto:要敢于一个人默默的面对自己,强大自己才是核心。不要等到什么都没有了,才下定决心去做。种一颗树,最好的时间是十年前…...
Golang1.18新特性介绍——泛型
社区长期高呼的泛型特性在Golang 1.18中终于正式发布,Go泛型实现与传统的C有较大差异,更像Rust的泛型实现。本文详细介绍Golang泛型及其特性,包括泛型语法、类型参数、类型约束、类型近似以及constraints包提供内置类型等。 最近写Dao代码&am…...

【SpringBoot17】SpringBoot中使用Quartz管理定时任务
定时任务在系统中用到的地方很多,例如每晚凌晨的数据备份,每小时获取第三方平台的 Token 信息等等,之前我们都是在项目中规定这个定时任务什么时候启动,到时间了便会自己启动,那么我们想要停止这个定时任务的时候&…...

杨辉三角形 (蓝桥杯) JAVA
目录题目描述:暴力破解(四成):二分法破解(满分):题目描述: 下面的图形是著名的杨辉三角形: 如果我们按从上到下、从左到右的顺序把所有数排成一列,可以得到如…...

AI制药 - AlphaFold Multimer 的 MSA Pairing 源码
目前最新版本是v2.3.1,2023.1.12 AlphaFold multimer v1 于 2021 年 7 月发布,同时发表了一篇描述其方法和结果的论文。AlphaFold multimer v1 使用了与 AlphaFold 单体相同的模型结构和训练方法,但增加了一些特征和损失函数来处理多条链。Al…...

TitanIDE:云原生开发到底强在哪里?
原文作者:行云创新技术总监 邓冰寒 引言 是一种新的软件开发方法,旨在构建更可靠、高效、弹性、安全和可扩展的应用程序。与传统的应用程序开发方式不同,云原生是将开发环境完全搬到云端,构建一站式的云原生开发环境。云原生的开…...
单片机常用完整性校验算法
一、前言 单片机在开发过程中经常会遇到大文件传输,或者大量数据传输,在一些工业环境下,数据传输并不是很稳定,如何检验数据的完整性就是个问题,这里简单介绍一下单片机常用的几种数据完整性校验方法。 二、CheckSum校…...

Anaconda 的安装配置及依赖项的内外网配置
在分享anaconda 的安装配置及使用前,我们必须先明白anaconda是什么;Anaconda是一个开源的Python发行版本。两者区别在于前者是一门编程语言,后者相当于编程语言中的工具包。 由于python自身缺少numpy、matplotlib、scipy、scikit-learn等一系…...

p84 CTF夺旗-PHP弱类型异或取反序列化RCE
数据来源 文章参考 本课重点: 案例1:PHP-相关总结知识点-后期复现案例2:PHP-弱类型对比绕过测试-常考点案例3:PHP-正则preg_match绕过-常考点案例4:PHP-命令执行RCE变异绕过-常考点案例5:PHP-反序列化考题…...

2022财报逆转,有赞穿透迷雾实现突破
2022年,商家经营面临困难。但在一些第三方服务商的帮助下,也有商家取得了逆势增长。 2023年3月23日,有赞发布2022年业绩报告,它帮助许多商家稳住了一整年的经营。2022年,有赞门店SaaS业务的GMV达到425亿元,…...

蓝桥杯 - 求组合数【C(a,b)】+ 卡特兰数
文章目录💬前言885. 求组合数 I C(m,n) 【dp】886 求组合数 II 【数据大小10万级别】 【费马小定理快速幂逆元】887. 求组合数 III 【le18级别】 【卢卡斯定理 逆元 快速幂 】888.求组合数 IV 【没有%p -- 高精度算出准确结果】 【分解质因数 高精度乘法 --只用一…...

膳食真菌在癌症免疫治疗中的作用: 从肠道微生物群的角度
谷禾健康 癌症是一种恶性肿瘤,它可以发生在人体的任何部位,包括肺、乳房、结肠、胃、肝、宫颈等。根据世界卫生组织的数据,全球每年有超过1800万人被诊断出患有癌症,其中约有1000万人死于癌症。癌症已成为全球范围内的主要健康问题…...
怎么将模糊的照片变清晰
怎么将模糊的照片变清晰?珍贵的照片每个人都会有,而遇到珍贵的照片变模糊了,相信会让人很苦恼的。那么有没有办法可以解决呢?答案是有的,我们可以用工具让模糊的照片变得清晰。下面就来分享一些让模糊的照片变清晰的方法,有兴趣…...

【软件测试】基础知识第一篇
文章目录一. 什么是软件测试二. 测试和调试的区别三. 什么是测试用例四. 软件的生命周期五. 软件测试的生命周期一. 什么是软件测试 软件测试就是验证软件产品特性是否满足用户的需求。 那需求又是什么呢?在多数软件公司,会有两种需求,一种…...

【百面成神】java web基础7问,你能坚持到第几问
前 言 🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端 ☕专栏简介:纯手打总结面试题,自用备用 🌰 文章简介:java web最基础、重要的8道面试题 文章目…...

Centos7安装、各种环境配置和常见bug解决方案,保姆级教程(更新中)
文章目录前言一、Centos7安装二、各种环境配置与安装2.1 安装net-tools(建议)2.2 配置静态网络(建议)2.1 修改Centos7的时间(建议)2.2 Centos7系统编码问题2.3 vim安装(建议)2.4 解决…...

【C++进阶】智能指针
文章目录为什么需要智能指针?内存泄漏什么是内存泄漏,内存泄漏的危害内存泄漏分类(了解)如何避免内存泄漏智能指针的使用及原理smart_ptrauto_ptrunique_ptrshared_ptr线程安全的解决循环引用weak_ptr删除器为什么需要智能指针&am…...

网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...

TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例
一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...

Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...
【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统
Kafka从入门到实战:构建高吞吐量分布式消息系统 一、Kafka概述 Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。 Kafka核心特…...

Android写一个捕获全局异常的工具类
项目开发和实际运行过程中难免会遇到异常发生,系统提供了一个可以捕获全局异常的工具Uncaughtexceptionhandler,它是Thread的子类(就是package java.lang;里线程的Thread)。本文将利用它将设备信息、报错信息以及错误的发生时间都…...