当前位置: 首页 > news >正文

主从同步机制

        RocketMQ的Broker分为Master和Slave两个角色,为了保证高可用性,Master角色的机器接收到消息后,要把内容同步到Slave机器上,这样一旦Master宕机,Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实现的源码。

1 同步属性信息

Slave需要和Master同步的不只是消息本身,一些元数据信息也需要同步,比如TopicConfig信息、ConsumerOffset信息、DelayOffset和SubscriptionGroupConfig信息。Broker在启动的时候,判断自己的角色是否是Slave,是的话就启动定时同步任务,如代码清单12-1所示。

代码清单12-1 Slave角色定时同步元数据信息

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.slaveSynchronize.syncAll();
            } catch (Throwable e) {
                log.error("ScheduledTask syncAll slave exception", e);
            }
        }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}

 

在syncAll函数里,调用syncTopicConfig()、getAllConsumerOffset()、syncDelayOffset()和syncSubscriptionGroupConfig()进行元数据同步。我们以syncConsumerOffset为例,来看看底层的具体实现,如代码清单12-2所示。

代码清单12-2 getAllConsumerOffset具体实现

public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBroker-Exception {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
        }
        default:
            break;
    }
    throw new MQBrokerException(response.getCode(), response.getRemark());
}

 

getAllConsumerOffset()的基本逻辑是组装一个RemotingCommand,底层通过Netty将消息发送到Master角色的Broker,然后获取Offset信息。

2 同步消息体

下面介绍Master和Slave之间同步消息体内容的方法,也就是同步CommitLog内容的方法。CommitLog和元数据信息不同:首先,CommitLog的数据量比元数据要大;其次,对实时性和可靠性要求也不一样。元数据信息是定时同步的,在两次同步的时间差里,如果出现异常可能会造成Master上的元数据内容和Slave上的元数据内容不一致,不过这种情况还可以补救(手动调整Offset,重启Consumer等)。CommitLog在高可靠性场景下如果没有及时同步,一旦Master机器出故障,消息就彻底丢失了。所以有专门的代码来实现Master和Slave之间消息体内容的同步。

主要的实现代码在Broker模块的org.apache.rocketmq.store.ha包中,里面包括HAService、HAConnection和WaitNotifyObject这三个类。

HAService是实现commitLog同步的主体,它在Master机器和Slave机器上执行的逻辑不同,默认是在Master机器上执行,见代码清单12-3。

代码清单12-3 根据Broker角色,确定是否设置HaMasterAddress

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig
        .getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }

 

当Broker角色是Slave的时候,MasterAddr的值会被正确设置,这样HAService在启动的时候,在HAClient这个内部类中,connectMaster会被正确执行,如代码清单12-4所示。

代码清单12-4 Slave角色连接Master

private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        this.lastWriteTimestamp = System.currentTimeMillis();
    }
    return this.socketChannel != null;
}

 

从代码中可以看出,HAClient试图通过Java NIO函数去连接Master角色的Broker。Master角色有相应的监听代码,如代码清单12-5所示。

代码清单12-5 监听Slave的HA连接

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

CommitLog的同步,不是经过netty command的方式,而是直接进行TCP连接,这样效率更高。连接成功以后,通过对比Master和Slave的Offset,不断进行同步。

3 sync_master和async_master

sync_master和async_master是写在Broker配置文件里的配置参数,这个参数影响的是主从同步的方式。从字面意思理解,sync_master是同步方式,也就是Master角色Broker中的消息要立刻同步过去;async_master是异步方式,也就是Master角色Broker中的消息是通过异步处理的方式同步到Slave角色的机器上的。下面结合代码来分析,sync_master下的消息同步如代码清单12-6所示。

代码清单12-6 sync_master下的消息同步

public void handleHA(AppendMessageResult result,
    PutMessageResult putMessageResult, MessageExt messageExt) {
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore
        .getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        if (messageExt.isWaitStoreMsgOK()) {
            // Determine whether to wait
            if (service.isSlaveOK(result.getWroteOffset() + result
                .getWroteBytes())) {
                GroupCommitRequest request = new GroupCommitRequest
                    (result.getWroteOffset() + result
                    .getWroteBytes());
                service.putRequest(request);
                service.getWaitNotifyObject().wakeupAll();
                boolean flushOK =
                    request.waitForFlush(this.defaultMessageStore
                        .getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do sync transfer other node, wait return, " +
                        "but failed, topic: " + messageExt
                        .getTopic() + " tags: "
                        + messageExt.getTags() + " client address: " +
                        messageExt.getBornHostNameString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus
                        .FLUSH_SLAVE_TIMEOUT);
                }
            }
            // Slave problem
            else {
                // Tell the producer, slave not available
                putMessageResult.setPutMessageStatus(PutMessageStatus
                    .SLAVE_NOT_AVAILABLE);
            }
        }
    }
}

 

在CommitLog类的putMessage函数末尾,调用handleHA函数。代码中的关键词是wakeupAll和waitForFlush,在同步方式下,Master每次写消息的时候,都会等待向Slave同步消息的过程,同步完成后再返回,如代码清单12-7所示。(putMessage函数比较长,仅列出关键的代码)。

代码清单12-7 putMessage中调用handleHA

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore
        .getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

  ……

    handleDiskFlush(result, putMessageResult, msg);
    handleHA(result, putMessageResult, msg);

    return putMessageResult;
}

 

相关文章:

主从同步机制

RocketMQ的Broker分为Master和Slave两个角色,为了保证高可用性,Master角色的机器接收到消息后,要把内容同步到Slave机器上,这样一旦Master宕机,Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实…...

Leetcode算法系列| 3. 无重复字符的最长子串

目录 1.题目2.题解C# 解法一:滑动窗口算法C# 解法二:索引寻找Java 解法一:滑动窗口算法Java 解法二:遍历字符串 1.题目 给定一个字符串 s ,请你找出其中不含有重复字符的 最长子串 的长度。 示例1: 输入: s "ab…...

Spring Cache(缓存框架)

学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。各位小伙伴,如果您: 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持,想组团高效学习… 想写博客但无从下手,急需…...

android开发:安卓13Wifi和热点查看与设置功能

近日对安卓热点功能做了一些技术验证,目的是想利用手机开热点给设备做初始化,用的是安卓13,简言之: 热点设置功能不可用,不可设置SSID和密码,不可程序控制开启关闭,网上的代码统统都过时了Loca…...

Java中的mysql——面试题+答案——第24期

当涉及MySQL时,面试题可以涵盖更多高级主题、安全性和实践经验。 MySQL中的存储引擎InnoDB和MyISAM的区别是什么? 答案: InnoDB支持事务,而MyISAM不支持。InnoDB使用行级锁,而MyISAM使用表级锁。InnoDB支持外键&#x…...

王者小游戏

游戏里的经验动物 Bear package beast; import sxt.GameFrame; public class Bear extends Beast {public Bear(int x, int y, GameFrame gameFrame) {super(x, y, gameFrame);setImg("C:\\Users\\辛欣\\OneDrive\\桌面\\王者荣耀图片(1)\\王者荣耀图片\\beast\\bear.jp…...

using meta-SQL 使用元SQL

%DatePart Syntax %DatePart(DTTM_Column) Description The %DatePart meta-SQL variable returns the date portion of the specified DateTime column. DatePart meta-SQL变量返回指定的DateTime列的日期部分。 Note: This meta-SQL variable is not implemented for COBOL. …...

函数式接口

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO 联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬 咱们今天讨论下函数式接…...

使用shell快速查看电脑曾经连接过的WiFi密码

此方法只能查看以前连接过的wifi名称和对应的密码 查看连接过的WiFi名称netsh wlan show profiles查看具体的WiFi名称netsh wlan show profile name"你的wifi名称" keyclear...

通过亚马逊云科技云存储服务探索云原生应用的威力

文章作者:Libai 欢迎来到我们关于“使用亚马逊云科技云存储服务构建云原生应用”的文章的第一部分。在本文中,我们将深入探讨云原生应用的世界,并探索亚马逊云科技云存储服务在构建和扩展这些应用中的关键作用。 亚马逊云科技开发者社区为开发…...

Boot工程快速启动【Linux】

Boot工程快速启动【Linux】 在idea中打包cd usr/在local文件夹下mkdir app进入app文件夹把打包好的文件(只上传其中的jar)上传到app文件下检查linux中的Java版本,保证和项目的Java 版本保持一致运行 java -jar sp补全***.jar想看效果得查询当…...

三 STM32F4使用Sys_Tick 实现微秒定时器和延时

更多细节参考这篇 1. 什么是时钟以及作用 1.1 什么是时钟 时钟是由电路产生的周期性的脉冲信号,相当于单片机的心脏 1.2 时钟对于STM32的作用 指令同步:cpu和内核外设使用时钟信号来进行指令同步数据传输控制: 时钟信号控制数据在内部总…...

唯创知音WT2003H系列MP3录音语音芯片:高精度ADC与DAC,强大IO驱动能力成就音频卓越

在音频领域里,高精度和强大的驱动能力一直是工程师们追求的目标。唯创知音的WT2003H系列MP3录音芯片恰好满足了这一需求,该芯片具备16 bit高精度的ADC及DAC功能,大功率的IO驱动能力,能够直接驱动64mA,为电子产品带来卓…...

记录Windows下安装redis的过程

开源博客项目Blog支持使用EasyCaching组件操作redis等缓存数据库,在继续学习开源博客项目Blog之前,准备先学习redis和EasyCaching组件的基本用法,本文记录在Windows下安装redis的过程。   虽然redis官网文档写着支持Linux、macOS、Windows等…...

7.5 Windows驱动开发:监控Register注册表回调

在笔者前一篇文章《内核枚举Registry注册表回调》中实现了对注册表的枚举,本章将实现对注册表的监控,不同于32位系统在64位系统中,微软为我们提供了两个针对注册表的专用内核监控函数,通过这两个函数可以在不劫持内核API的前提下实…...

NC56 XML 报文校验出错一例

好好的上线了、下午开完会告诉我有个凭证没法传入 NC 了。 请求报文如下&#xff1a; <?xml version"1.0" encodingUTF-8?> <ufinterface roottag"voucher" billtype"gl" replace"Y" receiver"10108" sender&q…...

STM32 ADC转换器、串口输出

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、ADC是什么&#xff1f;二、STM32的ADC2.1 认识STM32 ADC2.2转换方式2.3 为什么要校准&#xff1f;2.4 采样时间计算2.5 触发方式2.6 多通道采集解决方案2.7…...

[MySQL--基础]函数、约束

hello! 这里是欧_aita的频道。 今日语录:不管你觉得自己能做什么&#xff0c;或者你觉得你不能做什么&#xff0c;你都是对的。 祝福语&#xff1a;愿你的程序像太阳一样明亮&#xff0c;给世界带来温暖和光明。 大家可以在评论区畅所欲言&#xff0c;可以指出我的错误&#xf…...

企业数字化决策者深度分享

2023年11月18日&#xff0c;数聚股份应邀参加在台州椒江举办的数字中国企业峰会。本次会议中&#xff0c;诸多在企业数字化进程中做出重要贡献的高层管理者分享了各行各业极具引领性、创新性的数字化实践案例、产品和解决方案&#xff1b;数聚股份董事长陈庆华携其前瞻的数字化…...

JMeter压测常见面试问题

1、JMeter可以模拟哪些类型的负载&#xff1f; JMeter可以模拟各种类型的负载&#xff0c;包括但不限于Web应用程序、API、数据库、FTP、SMTP、JMS、SOAP / RESTful Web服务等。这使得JMeter成为一个功能强大且灵活的压力测试工具。 2、如何配置JMeter来进行分布式压力测试&a…...

Cursor实现用excel数据填充word模版的方法

cursor主页&#xff1a;https://www.cursor.com/ 任务目标&#xff1a;把excel格式的数据里的单元格&#xff0c;按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例&#xff0c;…...

K8S认证|CKS题库+答案| 11. AppArmor

目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作&#xff1a; 1&#xff09;、切换集群 2&#xff09;、切换节点 3&#xff09;、切换到 apparmor 的目录 4&#xff09;、执行 apparmor 策略模块 5&#xff09;、修改 pod 文件 6&#xff09;、…...

Qt Http Server模块功能及架构

Qt Http Server 是 Qt 6.0 中引入的一个新模块&#xff0c;它提供了一个轻量级的 HTTP 服务器实现&#xff0c;主要用于构建基于 HTTP 的应用程序和服务。 功能介绍&#xff1a; 主要功能 HTTP服务器功能&#xff1a; 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...

安宝特方案丨船舶智造的“AR+AI+作业标准化管理解决方案”(装配)

船舶制造装配管理现状&#xff1a;装配工作依赖人工经验&#xff0c;装配工人凭借长期实践积累的操作技巧完成零部件组装。企业通常制定了装配作业指导书&#xff0c;但在实际执行中&#xff0c;工人对指导书的理解和遵循程度参差不齐。 船舶装配过程中的挑战与需求 挑战 (1…...

免费数学几何作图web平台

光锐软件免费数学工具&#xff0c;maths,数学制图&#xff0c;数学作图&#xff0c;几何作图&#xff0c;几何&#xff0c;AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...

【SpringBoot自动化部署】

SpringBoot自动化部署方法 使用Jenkins进行持续集成与部署 Jenkins是最常用的自动化部署工具之一&#xff0c;能够实现代码拉取、构建、测试和部署的全流程自动化。 配置Jenkins任务时&#xff0c;需要添加Git仓库地址和凭证&#xff0c;设置构建触发器&#xff08;如GitHub…...

uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)

UniApp 集成腾讯云 IM 富媒体消息全攻略&#xff08;地理位置/文件&#xff09; 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型&#xff0c;核心实现方式&#xff1a; 标准消息类型&#xff1a;直接使用 SDK 内置类型&#xff08;文件、图片等&#xff09;自…...

c# 局部函数 定义、功能与示例

C# 局部函数&#xff1a;定义、功能与示例 1. 定义与功能 局部函数&#xff08;Local Function&#xff09;是嵌套在另一个方法内部的私有方法&#xff0c;仅在包含它的方法内可见。 • 作用&#xff1a;封装仅用于当前方法的逻辑&#xff0c;避免污染类作用域&#xff0c;提升…...

基于江科大stm32屏幕驱动,实现OLED多级菜单(动画效果),结构体链表实现(独创源码)

引言 在嵌入式系统中&#xff0c;用户界面的设计往往直接影响到用户体验。本文将以STM32微控制器和OLED显示屏为例&#xff0c;介绍如何实现一个多级菜单系统。该系统支持用户通过按键导航菜单&#xff0c;执行相应操作&#xff0c;并提供平滑的滚动动画效果。 本文设计了一个…...

从零开始了解数据采集(二十八)——制造业数字孪生

近年来&#xff0c;我国的工业领域正经历一场前所未有的数字化变革&#xff0c;从“双碳目标”到工业互联网平台的推广&#xff0c;国家政策和市场需求共同推动了制造业的升级。在这场变革中&#xff0c;数字孪生技术成为备受关注的关键工具&#xff0c;它不仅让企业“看见”设…...