当前位置: 首页 > news >正文

【RocketMQ】源码详解:Broker启动流程

Broker启动

入口: org.apache.rocketmq.broker.BrokerStartup#main

broker的启动主要分为两部分:1.创建brokerController 2.启动brokerController。与平时进行业务开发时不同的是,这里的BrokerController相当于Broker的一个中央控制器类,并不是编写http接口的类

  1. 创建brokerController:设置一些属性参数,读取外部配置文件,实例化brokerController并调用其初始化方法。

    1. 实例化brokerController:主要是会实例化很多的配置类和线程池的阻塞队列。
    2. 初始化方法 会加载如消费者消费进度等的文件,会实例化消息存储的服务类,并调用其加载方法,加载储存消息的相关文件到内存中,并进行数据恢复。还会创建各类线程池并注册netty服务的消息处理器,之后创建各种定时任务,如:持久化消费者消费进度的任务、定时打印各种日志的任务等。
  2. 启动brokerController: 内部会启动消息储存服务、netty服务、长轮询拉取消息挂起服务、向nameserver心跳定时任务等

public static void main(String[] args) {// broker启动start(createBrokerController(args));
}

创建brokerController

创建brokerController:org.apache.rocketmq.broker.BrokerStartup#createBrokerController

public static BrokerController createBrokerController(String[] args) {// 设置属性rocketmq.remoting.version,即当前rocketmq版本System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {NettySystemConfig.socketSndbufSize = 131072;}if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {NettySystemConfig.socketRcvbufSize = 131072;}try {//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new PosixParser());if (null == commandLine) {System.exit(-1);}// 创建Broker的配置类,包含Broker的各种配置,比如 ROCKETMQ_HOMEfinal BrokerConfig brokerConfig = new BrokerConfig();// NettyServer的配置类,Broker接收来自客户端的消息的时候作为服务端final NettyServerConfig nettyServerConfig = new NettyServerConfig();// NettyClient的配置类,Broker连接NameServer的时候还会作为客户端final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));// 设置作为NettyServer时的监听端口为10911nettyServerConfig.setListenPort(10911);// Broker的消息存储配置,例如各种文件大小等final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}/** 判断命令行启动是否包含 -c 命令,用于指定配置文件* 如果包含,则解析指定的配置文件* 启动Broker的时候使用命令参数 -c /xxx/rocketmq/config/conf/broker.conf*/if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFile = file;InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);properties2SystemEnv(properties);MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 获取namesrv地址String namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {// 可以指定多个地址,以 ; 隔开,这里进行拆分String[] addrArray = namesrvAddr.split(";");for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}// 设置、校验brokerId,BrokerId为0表示Master,非0表示Slaveswitch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}// 是否开启DLeger,即多副本(主从切换集群)if (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}// 设置高可用通信监听端口,为监听端口+1,默认就是10912// 该端口主要用于 如 主从同步之类的高可用操作messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");// 判断命令行中是否包含字符'p'(printConfigItem)和'm',如果存在则打印配置信息并结束jvm运行,没有的话就不用管if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption('m')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}// 打印当前broker的配置日志log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);// 实例化 BrokerController,内部主要初始化了一些配置类、manager类、处理器、线程池等final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discard// 将所有的-c的外部配置信息保存到BrokerController中的Configuration对象属性的allConfigs属性中controller.getConfiguration().registerConfig(properties);// 初始化BrokerControllerboolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 添加关闭钩子方法,在Broker关闭之前执行,进行一些内存清理、对象销毁等操作Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}

实例化brokerController

实例化brokerController: org.apache.rocketmq.broker.BrokerController#BrokerController

public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig
) {// broker的配置this.brokerConfig = brokerConfig;// 作为netty服务端与客户端交互的配置this.nettyServerConfig = nettyServerConfig;// 作为netty客户端与服务端交互的配置this.nettyClientConfig = nettyClientConfig;// 消息存储的配置this.messageStoreConfig = messageStoreConfig;// 消费者偏移量管理器,维护offset进度信息this.consumerOffsetManager = new ConsumerOffsetManager(this);//topic配置管理器,管理broker中存储的所有topic的配置this.topicConfigManager = new TopicConfigManager(this);// 处理消费者拉取消息请求的处理器this.pullMessageProcessor = new PullMessageProcessor(this);//拉取请求挂起服务,处理无消息时push长轮询消费者的挂起等待机制this.pullRequestHoldService = new PullRequestHoldService(this);//消息送达的监听器,生产者消息到达时通过该监听器触发pullRequestHoldService通知pullRequestHoldServicethis.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);//消费者id变化监听器this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);//消费者管理类,维护消费者组的注册实例信息以及topic的订阅信息,并对消费者id变化进行监听this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);//消费者过滤管理器,配置文件为:xx/config/consumerFilter.jsonthis.consumerFilterManager = new ConsumerFilterManager(this);//生产者管理器,包含生产者的注册信息,通过groupName分组this.producerManager = new ProducerManager();//客户端连接心跳服务,用于定时扫描生产者和消费者客户端,并将不活跃的客户端通道及相关信息移除this.clientHousekeepingService = new ClientHousekeepingService(this);//处理某些broker到客户端的请求,例如检查生产者的事务状态,重置offsetthis.broker2Client = new Broker2Client(this);//订阅分组关系管理器,维护消费者组的一些附加运维信息this.subscriptionGroupManager = new SubscriptionGroupManager(this);//broker对方访问的API,处理broker对外的发起请求,比如向nameServer注册,向master、slave发起的请求this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);//过滤服务管理器,拉取消息过滤this.filterServerManager = new FilterServerManager(this);//用于从节点,定时向主节点发起请求同步数据,例如topic配置、消费位移等this.slaveSynchronize = new SlaveSynchronize(this);/*初始化各种阻塞队列。将会被设置到对应的处理不同客户端请求的线程池执行器中*///处理来自生产者的发送消息的请求的队列this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());//处理reply消息的请求的队列,RocketMQ4.7.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返回响应消息,类似rpc调用效果。//即生产者发送了消息之后,可以同步或者异步的收到消费了这条消息的消费者的响应this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());//处理查询请求的队列this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());//客户端管理器的队列this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());//消费者管理器的队列,目前没用到this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());//心跳处理的队列this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());//事务消息相关处理的队列this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());//broker状态管理器,保存Broker运行时状态this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());//目前没用到this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));//broker快速失败服务this.brokerFastFailure = new BrokerFastFailure(this);//配置类this.configuration = new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);
}

初始化brokerController

初始化brokerController: org.apache.rocketmq.broker.BrokerController#initialize

public boolean initialize() throws CloneNotSupportedException {//topic配置文件加载,路径为  {user.home}/store/config/topics.jsonboolean result = this.topicConfigManager.load();// 消费者消费偏移量配置文件加载,路径为  {user.home}/store/config/consumerOffset.jsonresult = result && this.consumerOffsetManager.load();// 订阅分组配置文件加载,路径为  {user.home}/store/config/subscriptionGroup.jsonresult = result && this.subscriptionGroupManager.load();// 消费者过滤配置文件加载,路径为  {user.home}/store/config/consumerFilter.jsonresult = result && this.consumerFilterManager.load();if (result) {// 实例化和初始化消息存储服务相关类 DefaultMessageStoretry {// 实例化消息存储类DefaultMessageStorethis.messageStore =new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);/*** enableDLegerCommitLog 为 true(默认为false),则创建DLedgerRoleChangeHandler。* 在启用enableDLegerCommitLog情况下,broker通过raft协议选主,可以实现主从角色自动切换*/if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}/** 通过消息存储服务 加载 储存消息的相关文件到内存中,并进行数据恢复(此步骤是broker启动的核心步骤)* 文件:commitLog文件(储存消息的)、consumequeue文件(消费者依据此文件消费,储存指向commitLog中消息的偏移量)、indexFile文件* 数据恢复:broker可能会异常关闭,导致消息在文件中储存不完整 或 已储存到commitLog但未存储到consumequeue和indexFile*/result = result && this.messageStore.load();if (result) {// 创建netty远程服务,remotingServer和fastRemotingServerthis.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);/********************* 创建各种执行器线程池*********************/// 处理发送消息的请求的线程池this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));//处理拉取消息的请求的线程池this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));// 处理reply消息的请求的线程池(Reply模式允许Producer发出消息后,以同步或异步的形式等Consumer消费并返回一个响应消息,达到类似RPC的调用过程)this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getProcessReplyMessageThreadPoolNums(),this.brokerConfig.getProcessReplyMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.replyThreadPoolQueue,new ThreadFactoryImpl("ProcessReplyMessageThread_"));// 处理查询请求的线程池this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));// broker 管理线程池,作为默认处理器的线程池this.adminBrokerExecutor =Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));// 客户端管理器的线程池this.clientManageExecutor = new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(),this.brokerConfig.getClientManageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientManagerThreadPoolQueue,new ThreadFactoryImpl("ClientManageThread_"));// 心跳处理的线程池this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),this.brokerConfig.getHeartbeatThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.heartbeatThreadPoolQueue,new ThreadFactoryImpl("HeartbeatThread_", true));// 事务消息相关处理的线程池this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getEndTransactionThreadPoolNums(),this.brokerConfig.getEndTransactionThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.endTransactionThreadPoolQueue,new ThreadFactoryImpl("EndTransactionThread_"));//消费者管理的线程池this.consumerManageExecutor =Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));// 注册netty服务的消息处理器this.registerProcessor();/********************** 启动各种定时任务     **********************/final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;// 每隔24h打印昨天生产和消费的消息数量this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);//每隔5s將消费者offset进行持久化,存入consumerOffset.json文件中this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);//每隔10s將消费过滤信息进行持久化,存入consumerFilter.json文件中this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);/*** 每3分钟检查消费进度,若消费进度落后超过 consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16\* 且disableConsumeIfConsumerReadSlowly=true(默认false)则剔除掉该订阅组,该消费者组停止消费消息来保护broker* 因为存储消息的commitLog一个文件大小才为1024L * 1024 * 1024。* ps:不清楚原因*/this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);//每隔1s將打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小以及队列头部元素存在时间this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);//每隔1min將打印已存储在commitlog提交日志中但尚未分派到consumequeue消费队列的字节数。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() != null) {// 更新NamesrvAddrthis.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {/*** 如果未指定nameSvr地址,且开启从地址服务器获取* 则每2min从nameSvr地址服务器获取最新的地址并更新* 若希望动态更新nameSvr地址,则需要指定地址服务器url和fetchNamesrvAddrByAddressServer设置为true*/this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}//如果没有开启DLeger服务,DLeger开启后表示支持高可用的主从自动切换if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {// 如果是从节点,更新master地址if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {//如果是主节点,每隔60s將打印主从节点的差异this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}// ....省略// 初始化事务消息相关服务initialTransaction();// 初始化权限相关服务initialAcl();// 初始化RPC调用的钩子函数initialRpcHooks();}return result;
}

启动brokerController

入口: org.apache.rocketmq.broker.BrokerStartup#start

public static BrokerController start(BrokerController controller) {try {// BrokerController启动controller.start();String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();if (null != controller.getBrokerConfig().getNamesrvAddr()) {tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();}log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}
public void start() throws Exception {//启动消息存储服务if (this.messageStore != null) {this.messageStore.start();}//启动netty远程服务if (this.remotingServer != null) {this.remotingServer.start();}//启动netty远程服务VIP通道if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}if (this.fileWatchService != null) {this.fileWatchService.start();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}//长轮询拉取消息挂起服务启动if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}//客户端连接心跳服务启动if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}// 如果没有开启DLegerif (!messageStoreConfig.isEnableDLegerCommitLog()) {//如果不是SLAVE,那么启动事务消息检查服务startProcessorByHa(messageStoreConfig.getBrokerRole());//如果是SLAVE,则启动主从同步服务, 定时任务每隔10s与master机器同步数据,采用slave主动拉取的方法//同步的内容包括topic配置,消费者消费位移、延迟消息偏移量、订阅组信息等handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}// 定时任务,默认每30s向namesvr发起一次注册,即心跳包。时间间隔可配置registerNameServerPeriod,1万到6万毫秒间。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}

相关文章:

【RocketMQ】源码详解:Broker启动流程

Broker启动 入口&#xff1a; org.apache.rocketmq.broker.BrokerStartup#main broker的启动主要分为两部分&#xff1a;1.创建brokerController 2.启动brokerController。与平时进行业务开发时不同的是&#xff0c;这里的BrokerController相当于Broker的一个中央控制器类&…...

vue事件

1. 事件传参 <button click"clickEvt($event, 22)">点我</button>2. 事件修饰符 prevent&#xff1a;阻止默认事件stop&#xff1a;阻止事件冒泡&#xff08;加到子元素&#xff09;once&#xff1a;事件只触发一次capture&#xff1a;使用事件的捕获模…...

研报精选230220

目录 【行业230220国信证券】银行业行业专题&#xff1a;经济复苏中的优质中小银行【行业230220国信证券】汽车行业周报&#xff08;2023年第7周&#xff09;&#xff1a;吉利将发布新品牌“银河” &#xff0c;2022年宇通纯电动客车获欧洲销量冠军【行业230220开源证券】商贸零…...

kubernetes sd configs配置详解

1.基于Kubernetes的服务发现 kubernetes_sd_config 这个是以角色(role)来定义收集的&#xff0c;Kubernetes SD配置允许从Kubernetes的RESTAPI中检索scrape目标&#xff0c;并始终与群集状态保持同步。 凡<role>必须是endpoints&#xff0c;service&#xff0c;pod&…...

Linux查看文件的命令

目录 1、tail 2、head 3、cat 4、more 5、sed 6、less Linux查看日志的命令有多种: tail、cat、tac、head、echo等&#xff0c;本文只介绍几种常用的方法。 1、tail 命令格式: tail[必要参数][选择参数][文件] -f 循环读取 -q 不显示处理信息 -v 显示详细的处理信…...

如何单独清除某个网页的缓存(reload)

有时候在自己服务器上调试的时候&#xff0c;刷新一直不更新&#xff0c;样式改了也看不到&#xff0c;就很烦 今天教你一个方法快速清除 F12 控制台情况下右击左上角的刷新 这三个分别代表&#xff1a; ①正常重新加载(Ctrl R): 正常重新加载 此方法,浏览器发送请求时会…...

魔兽世界经典怀旧服务器架设教程

准备工具&#xff1a;MySQL服务端服务器最重要的你需要会技术、要不然都瞎扯 给你东西你也看不懂。教程开始&#xff1a;安装MySQL并创建数据库安装MySQL社区版&#xff0c;并配置SQL服务器。安装SQLyog。利用其登录&#xff0c;创建realmd、characters、mangos、scriptdev2数据…...

Interview系列 - 05 Java|Iterator迭代器|集合继承体系|Set List Map接口特性|List实现类区别

文章目录01. 迭代器 Iterator 是什么&#xff1f;02. 迭代器 Iterator 有什么特点&#xff1f;03. 迭代器 Iterator 怎么使用&#xff1f;04. 如何边遍历边移除 Collection 中的元素&#xff1f;05. Iterator 和 ListIterator 有什么区别&#xff1f;06. 数组和集合的区别&…...

LeetCode 1769. 移动所有球到每个盒子所需的最小操作数

有 n 个盒子。给你一个长度为 n 的二进制字符串 boxes &#xff0c;其中 boxes[i] 的值为 ‘0’ 表示第 i 个盒子是 空 的&#xff0c;而 boxes[i] 的值为 ‘1’ 表示盒子里有 一个 小球。 在一步操作中&#xff0c;你可以将 一个 小球从某个盒子移动到一个与之相邻的盒子中。…...

MKS SKIPR V1.0船长版(Voron 2.4 R2)配置简要笔记

第一次用MKS SKIPR V1.0&#xff0c;设置过程中&#xff0c;也不知道怎么回事&#xff0c;跟现有的资料有些出入。首先&#xff0c;基本的配置调试可以参考官方的使用说明。 MKS SKIPR V1.0 使用说明书 这个说明比较简单&#xff0c;很多深一点的东西没有提现&#xff0c;不过…...

90后,转行软件测试3年,从月入7000+到月入过万,整理出的这一万字经验分享。

周一发工资了&#xff0c;到手12857.65&#xff0c;美滋滋 今年是我毕业参加工作的第3年&#xff0c;工资终于来到5位数了。上一家公司月薪7000&#xff0c;实际拿到手就6450左右&#xff0c;感觉今年真的是元气满满啊&#xff0c;工资翻倍&#xff0c;良好的人生开端。 想起…...

Java之关于String字符串笔试面试重点

目录 一.关于字符串的常量池 1.关于字符串产生的三种方式 2.关于字符串的常量池 3.直接赋值法和new的方式产生对象的区别 二.关于intern方法 1.情况一(已经包含) 2.情况二(已经包含) 3.情况三(未包含) 4.情况四 三.关于字符串的不可变性 1.了解字符串的不可变性 2.Str…...

mdio协议

1. 简介 MDIO接口中有特定的术语定义总线上的各种设备&#xff0c;驱动MDIO总线的设备被定义为站管理实体&#xff08;STA&#xff09;&#xff0c;而被MDC管理的目标设备称为可被MDIO管理的设备&#xff08;MMD&#xff09;。 STA初始化MDIO所有的通信&#xff0c;同时负责驱动…...

kubectl命令

kubectl命令是操作 Kubernetes 集群的最直接和最高效的途径。 1、kubectl自动补全 $ source <(kubectl completion bash) # setup autocomplete in bash, bash-completion package should be installed first. $ source <(kubectl completion zsh) # setup autocomple…...

题库-JAVASE01

文章目录1.JAVA开发环境2.JAVA变量3.JAVA基本类型4.运算符和表达式5.分支结构6.循环结构7.数组8.方法1.JAVA开发环境 (单选题)在Java中&#xff0c;以下描述错误的是&#xff08; &#xff09; A…class是源文件 B…java是编译前的源文件 C…class是编译后的文件 D.Java程序需…...

Java序列化机制

Java序列化机制 概述 java中的序列化可能都停留在实现Serializable接口上&#xff0c;对于它里面的一些核心机制没有深入了解过。直到最近在项目中踩了一个坑&#xff0c;就是序列化对象添加一个字段以后&#xff0c;使用方系统报了反序列化失败&#xff0c;原因是我们双方的…...

3款强大到离谱电脑软件,都是效率神器,从此远离加班

闲话少说&#xff0c;直接上狠货。 1、ImageGlass ImageGlass是一款值得吹爆的电脑图片浏览工具&#xff0c;使用极其方便&#xff0c;体积50M左右&#xff0c;非常小巧&#xff0c;功能却强大到离谱&#xff0c;ImageGlass打开图片的速度极快&#xff0c;实现快速不同图像间切…...

【项目】Vue3+TS CMS 登录模块搭建

&#x1f4ad;&#x1f4ad; ✨&#xff1a;Vue3 TS   &#x1f49f;&#xff1a;东非不开森的主页   &#x1f49c;: keep going&#x1f49c;&#x1f49c;   &#x1f338;: 如有错误或不足之处&#xff0c;希望可以指正&#xff0c;非常感谢&#x1f609;   Vue3TS一、…...

Java 8 的那些常见写法

前言 现在Java已经发展到Java19版本了&#xff0c;由于Java后面一些版本&#xff0c;就开始商用收费了&#xff0c;所以目前绝大多数公司的JDK版本都是采用的之前稳定且免费的1.8版本&#xff0c;也就是Java8&#xff0c;这个版本已经能满足几乎所有业务的需求开发了&#xff…...

PyQt5数据库开发1 4.3 QSqlTableModel 之 相关槽函数的实现(多图长文详解)

目录 一、打开数据库表 1. 写打开数据库的槽函数 2. 运行后发现数据库可以打开了 3. ODBC配通了&#xff0c;数据库还是打不开 4. 写在tableView上显示数据库表的函数 5. 运行后发现表可以显示了 6. 代码分析 7. 添加列名称 8. 根据内容调整列宽 9. 备注&#xff1a;…...

web vue 项目 Docker化部署

Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段&#xff1a; 构建阶段&#xff08;Build Stage&#xff09;&#xff1a…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

2025盘古石杯决赛【手机取证】

前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来&#xff0c;实在找不到&#xff0c;希望有大佬教一下我。 还有就会议时间&#xff0c;我感觉不是图片时间&#xff0c;因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

Rust 异步编程

Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”

2025年#高考 将在近日拉开帷幕&#xff0c;#AI 监考一度冲上热搜。当AI深度融入高考&#xff0c;#时间同步 不再是辅助功能&#xff0c;而是决定AI监考系统成败的“生命线”。 AI亮相2025高考&#xff0c;40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕&#xff0c;江西、…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)

上一章用到了V2 的概念&#xff0c;其实 Fiori当中还有 V4&#xff0c;咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务)&#xff0c;代理中间件&#xff08;ui5-middleware-simpleproxy&#xff09;-CSDN博客…...

快刀集(1): 一刀斩断视频片头广告

一刀流&#xff1a;用一个简单脚本&#xff0c;秒杀视频片头广告&#xff0c;还你清爽观影体验。 1. 引子 作为一个爱生活、爱学习、爱收藏高清资源的老码农&#xff0c;平时写代码之余看看电影、补补片&#xff0c;是再正常不过的事。 电影嘛&#xff0c;要沉浸&#xff0c;…...

Xela矩阵三轴触觉传感器的工作原理解析与应用场景

Xela矩阵三轴触觉传感器通过先进技术模拟人类触觉感知&#xff0c;帮助设备实现精确的力测量与位移监测。其核心功能基于磁性三维力测量与空间位移测量&#xff0c;能够捕捉多维触觉信息。该传感器的设计不仅提升了触觉感知的精度&#xff0c;还为机器人、医疗设备和制造业的智…...

在golang中如何将已安装的依赖降级处理,比如:将 go-ansible/v2@v2.2.0 更换为 go-ansible/@v1.1.7

在 Go 项目中降级 go-ansible 从 v2.2.0 到 v1.1.7 具体步骤&#xff1a; 第一步&#xff1a; 修改 go.mod 文件 // 原 v2 版本声明 require github.com/apenella/go-ansible/v2 v2.2.0 替换为&#xff1a; // 改为 v…...