主从同步机制
RocketMQ的Broker分为Master和Slave两个角色,为了保证高可用性,Master角色的机器接收到消息后,要把内容同步到Slave机器上,这样一旦Master宕机,Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实现的源码。
1 同步属性信息
Slave需要和Master同步的不只是消息本身,一些元数据信息也需要同步,比如TopicConfig信息、ConsumerOffset信息、DelayOffset和SubscriptionGroupConfig信息。Broker在启动的时候,判断自己的角色是否是Slave,是的话就启动定时同步任务,如代码清单12-1所示。
代码清单12-1 Slave角色定时同步元数据信息
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
在syncAll函数里,调用syncTopicConfig()、getAllConsumerOffset()、syncDelayOffset()和syncSubscriptionGroupConfig()进行元数据同步。我们以syncConsumerOffset为例,来看看底层的具体实现,如代码清单12-2所示。
代码清单12-2 getAllConsumerOffset具体实现
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
final String addr) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBroker-Exception {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
getAllConsumerOffset()的基本逻辑是组装一个RemotingCommand,底层通过Netty将消息发送到Master角色的Broker,然后获取Offset信息。
2 同步消息体
下面介绍Master和Slave之间同步消息体内容的方法,也就是同步CommitLog内容的方法。CommitLog和元数据信息不同:首先,CommitLog的数据量比元数据要大;其次,对实时性和可靠性要求也不一样。元数据信息是定时同步的,在两次同步的时间差里,如果出现异常可能会造成Master上的元数据内容和Slave上的元数据内容不一致,不过这种情况还可以补救(手动调整Offset,重启Consumer等)。CommitLog在高可靠性场景下如果没有及时同步,一旦Master机器出故障,消息就彻底丢失了。所以有专门的代码来实现Master和Slave之间消息体内容的同步。
主要的实现代码在Broker模块的org.apache.rocketmq.store.ha包中,里面包括HAService、HAConnection和WaitNotifyObject这三个类。
HAService是实现commitLog同步的主体,它在Master机器和Slave机器上执行的逻辑不同,默认是在Master机器上执行,见代码清单12-3。
代码清单12-3 根据Broker角色,确定是否设置HaMasterAddress
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig
.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
当Broker角色是Slave的时候,MasterAddr的值会被正确设置,这样HAService在启动的时候,在HAClient这个内部类中,connectMaster会被正确执行,如代码清单12-4所示。
代码清单12-4 Slave角色连接Master
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
从代码中可以看出,HAClient试图通过Java NIO函数去连接Master角色的Broker。Master角色有相应的监听代码,如代码清单12-5所示。
代码清单12-5 监听Slave的HA连接
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
CommitLog的同步,不是经过netty command的方式,而是直接进行TCP连接,这样效率更高。连接成功以后,通过对比Master和Slave的Offset,不断进行同步。
3 sync_master和async_master
sync_master和async_master是写在Broker配置文件里的配置参数,这个参数影响的是主从同步的方式。从字面意思理解,sync_master是同步方式,也就是Master角色Broker中的消息要立刻同步过去;async_master是异步方式,也就是Master角色Broker中的消息是通过异步处理的方式同步到Slave角色的机器上的。下面结合代码来分析,sync_master下的消息同步如代码清单12-6所示。
代码清单12-6 sync_master下的消息同步
public void handleHA(AppendMessageResult result,
PutMessageResult putMessageResult, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore
.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() + result
.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest
(result.getWroteOffset() + result
.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
request.waitForFlush(this.defaultMessageStore
.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, " +
"but failed, topic: " + messageExt
.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " +
messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus
.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus
.SLAVE_NOT_AVAILABLE);
}
}
}
}
在CommitLog类的putMessage函数末尾,调用handleHA函数。代码中的关键词是wakeupAll和waitForFlush,在同步方式下,Master每次写消息的时候,都会等待向Slave同步消息的过程,同步完成后再返回,如代码清单12-7所示。(putMessage函数比较长,仅列出关键的代码)。
代码清单12-7 putMessage中调用handleHA
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore
.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
……
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
相关文章:
主从同步机制
RocketMQ的Broker分为Master和Slave两个角色,为了保证高可用性,Master角色的机器接收到消息后,要把内容同步到Slave机器上,这样一旦Master宕机,Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实…...
Leetcode算法系列| 3. 无重复字符的最长子串
目录 1.题目2.题解C# 解法一:滑动窗口算法C# 解法二:索引寻找Java 解法一:滑动窗口算法Java 解法二:遍历字符串 1.题目 给定一个字符串 s ,请你找出其中不含有重复字符的 最长子串 的长度。 示例1: 输入: s "ab…...
Spring Cache(缓存框架)
学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。各位小伙伴,如果您: 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持,想组团高效学习… 想写博客但无从下手,急需…...
android开发:安卓13Wifi和热点查看与设置功能
近日对安卓热点功能做了一些技术验证,目的是想利用手机开热点给设备做初始化,用的是安卓13,简言之: 热点设置功能不可用,不可设置SSID和密码,不可程序控制开启关闭,网上的代码统统都过时了Loca…...
Java中的mysql——面试题+答案——第24期
当涉及MySQL时,面试题可以涵盖更多高级主题、安全性和实践经验。 MySQL中的存储引擎InnoDB和MyISAM的区别是什么? 答案: InnoDB支持事务,而MyISAM不支持。InnoDB使用行级锁,而MyISAM使用表级锁。InnoDB支持外键&#x…...
王者小游戏
游戏里的经验动物 Bear package beast; import sxt.GameFrame; public class Bear extends Beast {public Bear(int x, int y, GameFrame gameFrame) {super(x, y, gameFrame);setImg("C:\\Users\\辛欣\\OneDrive\\桌面\\王者荣耀图片(1)\\王者荣耀图片\\beast\\bear.jp…...
using meta-SQL 使用元SQL
%DatePart Syntax %DatePart(DTTM_Column) Description The %DatePart meta-SQL variable returns the date portion of the specified DateTime column. DatePart meta-SQL变量返回指定的DateTime列的日期部分。 Note: This meta-SQL variable is not implemented for COBOL. …...
函数式接口
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO 联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬 咱们今天讨论下函数式接…...
使用shell快速查看电脑曾经连接过的WiFi密码
此方法只能查看以前连接过的wifi名称和对应的密码 查看连接过的WiFi名称netsh wlan show profiles查看具体的WiFi名称netsh wlan show profile name"你的wifi名称" keyclear...
通过亚马逊云科技云存储服务探索云原生应用的威力
文章作者:Libai 欢迎来到我们关于“使用亚马逊云科技云存储服务构建云原生应用”的文章的第一部分。在本文中,我们将深入探讨云原生应用的世界,并探索亚马逊云科技云存储服务在构建和扩展这些应用中的关键作用。 亚马逊云科技开发者社区为开发…...
Boot工程快速启动【Linux】
Boot工程快速启动【Linux】 在idea中打包cd usr/在local文件夹下mkdir app进入app文件夹把打包好的文件(只上传其中的jar)上传到app文件下检查linux中的Java版本,保证和项目的Java 版本保持一致运行 java -jar sp补全***.jar想看效果得查询当…...
三 STM32F4使用Sys_Tick 实现微秒定时器和延时
更多细节参考这篇 1. 什么是时钟以及作用 1.1 什么是时钟 时钟是由电路产生的周期性的脉冲信号,相当于单片机的心脏 1.2 时钟对于STM32的作用 指令同步:cpu和内核外设使用时钟信号来进行指令同步数据传输控制: 时钟信号控制数据在内部总…...
唯创知音WT2003H系列MP3录音语音芯片:高精度ADC与DAC,强大IO驱动能力成就音频卓越
在音频领域里,高精度和强大的驱动能力一直是工程师们追求的目标。唯创知音的WT2003H系列MP3录音芯片恰好满足了这一需求,该芯片具备16 bit高精度的ADC及DAC功能,大功率的IO驱动能力,能够直接驱动64mA,为电子产品带来卓…...
记录Windows下安装redis的过程
开源博客项目Blog支持使用EasyCaching组件操作redis等缓存数据库,在继续学习开源博客项目Blog之前,准备先学习redis和EasyCaching组件的基本用法,本文记录在Windows下安装redis的过程。 虽然redis官网文档写着支持Linux、macOS、Windows等…...
7.5 Windows驱动开发:监控Register注册表回调
在笔者前一篇文章《内核枚举Registry注册表回调》中实现了对注册表的枚举,本章将实现对注册表的监控,不同于32位系统在64位系统中,微软为我们提供了两个针对注册表的专用内核监控函数,通过这两个函数可以在不劫持内核API的前提下实…...
NC56 XML 报文校验出错一例
好好的上线了、下午开完会告诉我有个凭证没法传入 NC 了。 请求报文如下: <?xml version"1.0" encodingUTF-8?> <ufinterface roottag"voucher" billtype"gl" replace"Y" receiver"10108" sender&q…...
STM32 ADC转换器、串口输出
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、ADC是什么?二、STM32的ADC2.1 认识STM32 ADC2.2转换方式2.3 为什么要校准?2.4 采样时间计算2.5 触发方式2.6 多通道采集解决方案2.7…...
[MySQL--基础]函数、约束
hello! 这里是欧_aita的频道。 今日语录:不管你觉得自己能做什么,或者你觉得你不能做什么,你都是对的。 祝福语:愿你的程序像太阳一样明亮,给世界带来温暖和光明。 大家可以在评论区畅所欲言,可以指出我的错误…...
企业数字化决策者深度分享
2023年11月18日,数聚股份应邀参加在台州椒江举办的数字中国企业峰会。本次会议中,诸多在企业数字化进程中做出重要贡献的高层管理者分享了各行各业极具引领性、创新性的数字化实践案例、产品和解决方案;数聚股份董事长陈庆华携其前瞻的数字化…...
JMeter压测常见面试问题
1、JMeter可以模拟哪些类型的负载? JMeter可以模拟各种类型的负载,包括但不限于Web应用程序、API、数据库、FTP、SMTP、JMS、SOAP / RESTful Web服务等。这使得JMeter成为一个功能强大且灵活的压力测试工具。 2、如何配置JMeter来进行分布式压力测试&a…...
深入 Hadoop 高可用:Leader、Follower 、Observer」角色详解
在 Hadoop 高可用(HA)架构中,Leader 选举是保障集群稳定的核心机制 —— 我们常听说 Leader(主节点)和 Follower(从节点),但很少有人深入聊第三种关键角色:Observer&…...
跳表(Skip List):思想、优劣与应用场景完全解读
一、为什么需要跳表?在计算机科学中,我们经常需要一种数据结构,既能快速查找,又能高效插入和删除。数组的二分查找虽然快(O(log n)),但插入删除却需要移动大量元素(O(n))…...
卡尔曼滤波器开发实践之二:从理论到代码的五大公式实现解析
1. 卡尔曼滤波器五大公式的工程化理解 卡尔曼滤波器就像一位经验丰富的导航员,在充满噪声的数据海洋中为我们指引方向。我在实际项目中多次使用它来处理传感器数据,发现真正理解这五大公式的工程意义比死记硬背数学推导更重要。 1.1 预测与更新的双人舞 …...
Cursor Pro免费激活终极指南:突破API限制的完整技术解决方案
Cursor Pro免费激活终极指南:突破API限制的完整技术解决方案 【免费下载链接】cursor-free-vip [Support 0.45](Multi Language 多语言)自动注册 Cursor Ai ,自动重置机器ID , 免费升级使用Pro 功能: Youve reached yo…...
工业物联网通信困境:如何用j2mod Java Modbus库构建高效设备通信系统
工业物联网通信困境:如何用j2mod Java Modbus库构建高效设备通信系统 【免费下载链接】j2mod Enhanced Modbus library implemented in the Java programming language 项目地址: https://gitcode.com/gh_mirrors/j2/j2mod 在工业自动化和物联网系统开发中&a…...
【42】软考软件设计师——设计模式代码实战|单例/工厂/策略/观察者 真实业务案例精讲
摘要:本文是《软件设计师50讲通关|从零基础到工程师职称》专栏第42篇,属于模块五:算法与代码实战强化第四篇,聚焦软考上午选择题与下午代码填空题四大高频设计模式:单例模式(双重检查锁)、工厂模式、策略模式、观察者模式。全文超4800字,搭配Mermaid类图/时序图清晰展…...
R语言建模总“跑不通”?3步定位环境污染源:从.Rprofile到Sys.getenv()的深度诊断手册
第一章:R语言建模环境“跑不通”现象的典型表现与危害R语言建模环境中的“跑不通”并非指语法错误导致的立即报错,而是一类隐蔽性强、复现性差、定位困难的系统性失配问题。这类问题常在跨平台迁移、版本升级或协作开发中集中爆发,表面看似代…...
从Java转行大模型应用,Dify 本地部署和可视化智能体创建全流程(低代码 + 脚本)
Dify 是开源低代码 AI 应用开发平台,支持Docker 一键本地私有化部署,通过可视化拖拽即可创建智能体,无需复杂编码,还可通过脚本自动化部署与智能体配置。以下是完整实操方案:一、本地部署(Docker Compose&a…...
TavernAI高级功能探索:自定义设置与API集成的深度教程
TavernAI高级功能探索:自定义设置与API集成的深度教程 【免费下载链接】TavernAI Atmospheric adventure chat for AI language models (KoboldAI, NovelAI, Pygmalion, OpenAI chatgpt, gpt-4) 项目地址: https://gitcode.com/gh_mirrors/ta/TavernAI Taver…...
别再手写Verilog了!用Simulink HDL Coder快速搭建FPGA原型(附避坑指南)
从算法模型到硬件实现:Simulink HDL Coder高效FPGA开发实战 在数字信号处理和通信系统开发领域,FPGA因其并行计算能力和可重构特性成为算法加速的理想平台。然而,传统手写Verilog/VHDL的开发模式存在几个显著痛点:开发周期长&…...
