当前位置: 首页 > news >正文

【RocketMQ】源码详解:Broker启动流程

Broker启动

入口: org.apache.rocketmq.broker.BrokerStartup#main

broker的启动主要分为两部分:1.创建brokerController 2.启动brokerController。与平时进行业务开发时不同的是,这里的BrokerController相当于Broker的一个中央控制器类,并不是编写http接口的类

  1. 创建brokerController:设置一些属性参数,读取外部配置文件,实例化brokerController并调用其初始化方法。

    1. 实例化brokerController:主要是会实例化很多的配置类和线程池的阻塞队列。
    2. 初始化方法 会加载如消费者消费进度等的文件,会实例化消息存储的服务类,并调用其加载方法,加载储存消息的相关文件到内存中,并进行数据恢复。还会创建各类线程池并注册netty服务的消息处理器,之后创建各种定时任务,如:持久化消费者消费进度的任务、定时打印各种日志的任务等。
  2. 启动brokerController: 内部会启动消息储存服务、netty服务、长轮询拉取消息挂起服务、向nameserver心跳定时任务等

public static void main(String[] args) {// broker启动start(createBrokerController(args));
}

创建brokerController

创建brokerController:org.apache.rocketmq.broker.BrokerStartup#createBrokerController

public static BrokerController createBrokerController(String[] args) {// 设置属性rocketmq.remoting.version,即当前rocketmq版本System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {NettySystemConfig.socketSndbufSize = 131072;}if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {NettySystemConfig.socketRcvbufSize = 131072;}try {//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new PosixParser());if (null == commandLine) {System.exit(-1);}// 创建Broker的配置类,包含Broker的各种配置,比如 ROCKETMQ_HOMEfinal BrokerConfig brokerConfig = new BrokerConfig();// NettyServer的配置类,Broker接收来自客户端的消息的时候作为服务端final NettyServerConfig nettyServerConfig = new NettyServerConfig();// NettyClient的配置类,Broker连接NameServer的时候还会作为客户端final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));// 设置作为NettyServer时的监听端口为10911nettyServerConfig.setListenPort(10911);// Broker的消息存储配置,例如各种文件大小等final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}/** 判断命令行启动是否包含 -c 命令,用于指定配置文件* 如果包含,则解析指定的配置文件* 启动Broker的时候使用命令参数 -c /xxx/rocketmq/config/conf/broker.conf*/if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFile = file;InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);properties2SystemEnv(properties);MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 获取namesrv地址String namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {// 可以指定多个地址,以 ; 隔开,这里进行拆分String[] addrArray = namesrvAddr.split(";");for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}// 设置、校验brokerId,BrokerId为0表示Master,非0表示Slaveswitch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}// 是否开启DLeger,即多副本(主从切换集群)if (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}// 设置高可用通信监听端口,为监听端口+1,默认就是10912// 该端口主要用于 如 主从同步之类的高可用操作messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");// 判断命令行中是否包含字符'p'(printConfigItem)和'm',如果存在则打印配置信息并结束jvm运行,没有的话就不用管if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption('m')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}// 打印当前broker的配置日志log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);// 实例化 BrokerController,内部主要初始化了一些配置类、manager类、处理器、线程池等final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discard// 将所有的-c的外部配置信息保存到BrokerController中的Configuration对象属性的allConfigs属性中controller.getConfiguration().registerConfig(properties);// 初始化BrokerControllerboolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 添加关闭钩子方法,在Broker关闭之前执行,进行一些内存清理、对象销毁等操作Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}

实例化brokerController

实例化brokerController: org.apache.rocketmq.broker.BrokerController#BrokerController

public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig
) {// broker的配置this.brokerConfig = brokerConfig;// 作为netty服务端与客户端交互的配置this.nettyServerConfig = nettyServerConfig;// 作为netty客户端与服务端交互的配置this.nettyClientConfig = nettyClientConfig;// 消息存储的配置this.messageStoreConfig = messageStoreConfig;// 消费者偏移量管理器,维护offset进度信息this.consumerOffsetManager = new ConsumerOffsetManager(this);//topic配置管理器,管理broker中存储的所有topic的配置this.topicConfigManager = new TopicConfigManager(this);// 处理消费者拉取消息请求的处理器this.pullMessageProcessor = new PullMessageProcessor(this);//拉取请求挂起服务,处理无消息时push长轮询消费者的挂起等待机制this.pullRequestHoldService = new PullRequestHoldService(this);//消息送达的监听器,生产者消息到达时通过该监听器触发pullRequestHoldService通知pullRequestHoldServicethis.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);//消费者id变化监听器this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);//消费者管理类,维护消费者组的注册实例信息以及topic的订阅信息,并对消费者id变化进行监听this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);//消费者过滤管理器,配置文件为:xx/config/consumerFilter.jsonthis.consumerFilterManager = new ConsumerFilterManager(this);//生产者管理器,包含生产者的注册信息,通过groupName分组this.producerManager = new ProducerManager();//客户端连接心跳服务,用于定时扫描生产者和消费者客户端,并将不活跃的客户端通道及相关信息移除this.clientHousekeepingService = new ClientHousekeepingService(this);//处理某些broker到客户端的请求,例如检查生产者的事务状态,重置offsetthis.broker2Client = new Broker2Client(this);//订阅分组关系管理器,维护消费者组的一些附加运维信息this.subscriptionGroupManager = new SubscriptionGroupManager(this);//broker对方访问的API,处理broker对外的发起请求,比如向nameServer注册,向master、slave发起的请求this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);//过滤服务管理器,拉取消息过滤this.filterServerManager = new FilterServerManager(this);//用于从节点,定时向主节点发起请求同步数据,例如topic配置、消费位移等this.slaveSynchronize = new SlaveSynchronize(this);/*初始化各种阻塞队列。将会被设置到对应的处理不同客户端请求的线程池执行器中*///处理来自生产者的发送消息的请求的队列this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());//处理reply消息的请求的队列,RocketMQ4.7.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返回响应消息,类似rpc调用效果。//即生产者发送了消息之后,可以同步或者异步的收到消费了这条消息的消费者的响应this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());//处理查询请求的队列this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());//客户端管理器的队列this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());//消费者管理器的队列,目前没用到this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());//心跳处理的队列this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());//事务消息相关处理的队列this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());//broker状态管理器,保存Broker运行时状态this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());//目前没用到this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));//broker快速失败服务this.brokerFastFailure = new BrokerFastFailure(this);//配置类this.configuration = new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);
}

初始化brokerController

初始化brokerController: org.apache.rocketmq.broker.BrokerController#initialize

public boolean initialize() throws CloneNotSupportedException {//topic配置文件加载,路径为  {user.home}/store/config/topics.jsonboolean result = this.topicConfigManager.load();// 消费者消费偏移量配置文件加载,路径为  {user.home}/store/config/consumerOffset.jsonresult = result && this.consumerOffsetManager.load();// 订阅分组配置文件加载,路径为  {user.home}/store/config/subscriptionGroup.jsonresult = result && this.subscriptionGroupManager.load();// 消费者过滤配置文件加载,路径为  {user.home}/store/config/consumerFilter.jsonresult = result && this.consumerFilterManager.load();if (result) {// 实例化和初始化消息存储服务相关类 DefaultMessageStoretry {// 实例化消息存储类DefaultMessageStorethis.messageStore =new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);/*** enableDLegerCommitLog 为 true(默认为false),则创建DLedgerRoleChangeHandler。* 在启用enableDLegerCommitLog情况下,broker通过raft协议选主,可以实现主从角色自动切换*/if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}/** 通过消息存储服务 加载 储存消息的相关文件到内存中,并进行数据恢复(此步骤是broker启动的核心步骤)* 文件:commitLog文件(储存消息的)、consumequeue文件(消费者依据此文件消费,储存指向commitLog中消息的偏移量)、indexFile文件* 数据恢复:broker可能会异常关闭,导致消息在文件中储存不完整 或 已储存到commitLog但未存储到consumequeue和indexFile*/result = result && this.messageStore.load();if (result) {// 创建netty远程服务,remotingServer和fastRemotingServerthis.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);/********************* 创建各种执行器线程池*********************/// 处理发送消息的请求的线程池this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));//处理拉取消息的请求的线程池this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));// 处理reply消息的请求的线程池(Reply模式允许Producer发出消息后,以同步或异步的形式等Consumer消费并返回一个响应消息,达到类似RPC的调用过程)this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getProcessReplyMessageThreadPoolNums(),this.brokerConfig.getProcessReplyMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.replyThreadPoolQueue,new ThreadFactoryImpl("ProcessReplyMessageThread_"));// 处理查询请求的线程池this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));// broker 管理线程池,作为默认处理器的线程池this.adminBrokerExecutor =Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));// 客户端管理器的线程池this.clientManageExecutor = new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(),this.brokerConfig.getClientManageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientManagerThreadPoolQueue,new ThreadFactoryImpl("ClientManageThread_"));// 心跳处理的线程池this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),this.brokerConfig.getHeartbeatThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.heartbeatThreadPoolQueue,new ThreadFactoryImpl("HeartbeatThread_", true));// 事务消息相关处理的线程池this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getEndTransactionThreadPoolNums(),this.brokerConfig.getEndTransactionThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.endTransactionThreadPoolQueue,new ThreadFactoryImpl("EndTransactionThread_"));//消费者管理的线程池this.consumerManageExecutor =Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));// 注册netty服务的消息处理器this.registerProcessor();/********************** 启动各种定时任务     **********************/final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;// 每隔24h打印昨天生产和消费的消息数量this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);//每隔5s將消费者offset进行持久化,存入consumerOffset.json文件中this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);//每隔10s將消费过滤信息进行持久化,存入consumerFilter.json文件中this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);/*** 每3分钟检查消费进度,若消费进度落后超过 consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16\* 且disableConsumeIfConsumerReadSlowly=true(默认false)则剔除掉该订阅组,该消费者组停止消费消息来保护broker* 因为存储消息的commitLog一个文件大小才为1024L * 1024 * 1024。* ps:不清楚原因*/this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);//每隔1s將打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小以及队列头部元素存在时间this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);//每隔1min將打印已存储在commitlog提交日志中但尚未分派到consumequeue消费队列的字节数。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() != null) {// 更新NamesrvAddrthis.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {/*** 如果未指定nameSvr地址,且开启从地址服务器获取* 则每2min从nameSvr地址服务器获取最新的地址并更新* 若希望动态更新nameSvr地址,则需要指定地址服务器url和fetchNamesrvAddrByAddressServer设置为true*/this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}//如果没有开启DLeger服务,DLeger开启后表示支持高可用的主从自动切换if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {// 如果是从节点,更新master地址if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {//如果是主节点,每隔60s將打印主从节点的差异this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}// ....省略// 初始化事务消息相关服务initialTransaction();// 初始化权限相关服务initialAcl();// 初始化RPC调用的钩子函数initialRpcHooks();}return result;
}

启动brokerController

入口: org.apache.rocketmq.broker.BrokerStartup#start

public static BrokerController start(BrokerController controller) {try {// BrokerController启动controller.start();String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();if (null != controller.getBrokerConfig().getNamesrvAddr()) {tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();}log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}
public void start() throws Exception {//启动消息存储服务if (this.messageStore != null) {this.messageStore.start();}//启动netty远程服务if (this.remotingServer != null) {this.remotingServer.start();}//启动netty远程服务VIP通道if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}if (this.fileWatchService != null) {this.fileWatchService.start();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}//长轮询拉取消息挂起服务启动if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}//客户端连接心跳服务启动if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}// 如果没有开启DLegerif (!messageStoreConfig.isEnableDLegerCommitLog()) {//如果不是SLAVE,那么启动事务消息检查服务startProcessorByHa(messageStoreConfig.getBrokerRole());//如果是SLAVE,则启动主从同步服务, 定时任务每隔10s与master机器同步数据,采用slave主动拉取的方法//同步的内容包括topic配置,消费者消费位移、延迟消息偏移量、订阅组信息等handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}// 定时任务,默认每30s向namesvr发起一次注册,即心跳包。时间间隔可配置registerNameServerPeriod,1万到6万毫秒间。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}

相关文章:

【RocketMQ】源码详解:Broker启动流程

Broker启动 入口&#xff1a; org.apache.rocketmq.broker.BrokerStartup#main broker的启动主要分为两部分&#xff1a;1.创建brokerController 2.启动brokerController。与平时进行业务开发时不同的是&#xff0c;这里的BrokerController相当于Broker的一个中央控制器类&…...

vue事件

1. 事件传参 <button click"clickEvt($event, 22)">点我</button>2. 事件修饰符 prevent&#xff1a;阻止默认事件stop&#xff1a;阻止事件冒泡&#xff08;加到子元素&#xff09;once&#xff1a;事件只触发一次capture&#xff1a;使用事件的捕获模…...

研报精选230220

目录 【行业230220国信证券】银行业行业专题&#xff1a;经济复苏中的优质中小银行【行业230220国信证券】汽车行业周报&#xff08;2023年第7周&#xff09;&#xff1a;吉利将发布新品牌“银河” &#xff0c;2022年宇通纯电动客车获欧洲销量冠军【行业230220开源证券】商贸零…...

kubernetes sd configs配置详解

1.基于Kubernetes的服务发现 kubernetes_sd_config 这个是以角色(role)来定义收集的&#xff0c;Kubernetes SD配置允许从Kubernetes的RESTAPI中检索scrape目标&#xff0c;并始终与群集状态保持同步。 凡<role>必须是endpoints&#xff0c;service&#xff0c;pod&…...

Linux查看文件的命令

目录 1、tail 2、head 3、cat 4、more 5、sed 6、less Linux查看日志的命令有多种: tail、cat、tac、head、echo等&#xff0c;本文只介绍几种常用的方法。 1、tail 命令格式: tail[必要参数][选择参数][文件] -f 循环读取 -q 不显示处理信息 -v 显示详细的处理信…...

如何单独清除某个网页的缓存(reload)

有时候在自己服务器上调试的时候&#xff0c;刷新一直不更新&#xff0c;样式改了也看不到&#xff0c;就很烦 今天教你一个方法快速清除 F12 控制台情况下右击左上角的刷新 这三个分别代表&#xff1a; ①正常重新加载(Ctrl R): 正常重新加载 此方法,浏览器发送请求时会…...

魔兽世界经典怀旧服务器架设教程

准备工具&#xff1a;MySQL服务端服务器最重要的你需要会技术、要不然都瞎扯 给你东西你也看不懂。教程开始&#xff1a;安装MySQL并创建数据库安装MySQL社区版&#xff0c;并配置SQL服务器。安装SQLyog。利用其登录&#xff0c;创建realmd、characters、mangos、scriptdev2数据…...

Interview系列 - 05 Java|Iterator迭代器|集合继承体系|Set List Map接口特性|List实现类区别

文章目录01. 迭代器 Iterator 是什么&#xff1f;02. 迭代器 Iterator 有什么特点&#xff1f;03. 迭代器 Iterator 怎么使用&#xff1f;04. 如何边遍历边移除 Collection 中的元素&#xff1f;05. Iterator 和 ListIterator 有什么区别&#xff1f;06. 数组和集合的区别&…...

LeetCode 1769. 移动所有球到每个盒子所需的最小操作数

有 n 个盒子。给你一个长度为 n 的二进制字符串 boxes &#xff0c;其中 boxes[i] 的值为 ‘0’ 表示第 i 个盒子是 空 的&#xff0c;而 boxes[i] 的值为 ‘1’ 表示盒子里有 一个 小球。 在一步操作中&#xff0c;你可以将 一个 小球从某个盒子移动到一个与之相邻的盒子中。…...

MKS SKIPR V1.0船长版(Voron 2.4 R2)配置简要笔记

第一次用MKS SKIPR V1.0&#xff0c;设置过程中&#xff0c;也不知道怎么回事&#xff0c;跟现有的资料有些出入。首先&#xff0c;基本的配置调试可以参考官方的使用说明。 MKS SKIPR V1.0 使用说明书 这个说明比较简单&#xff0c;很多深一点的东西没有提现&#xff0c;不过…...

90后,转行软件测试3年,从月入7000+到月入过万,整理出的这一万字经验分享。

周一发工资了&#xff0c;到手12857.65&#xff0c;美滋滋 今年是我毕业参加工作的第3年&#xff0c;工资终于来到5位数了。上一家公司月薪7000&#xff0c;实际拿到手就6450左右&#xff0c;感觉今年真的是元气满满啊&#xff0c;工资翻倍&#xff0c;良好的人生开端。 想起…...

Java之关于String字符串笔试面试重点

目录 一.关于字符串的常量池 1.关于字符串产生的三种方式 2.关于字符串的常量池 3.直接赋值法和new的方式产生对象的区别 二.关于intern方法 1.情况一(已经包含) 2.情况二(已经包含) 3.情况三(未包含) 4.情况四 三.关于字符串的不可变性 1.了解字符串的不可变性 2.Str…...

mdio协议

1. 简介 MDIO接口中有特定的术语定义总线上的各种设备&#xff0c;驱动MDIO总线的设备被定义为站管理实体&#xff08;STA&#xff09;&#xff0c;而被MDC管理的目标设备称为可被MDIO管理的设备&#xff08;MMD&#xff09;。 STA初始化MDIO所有的通信&#xff0c;同时负责驱动…...

kubectl命令

kubectl命令是操作 Kubernetes 集群的最直接和最高效的途径。 1、kubectl自动补全 $ source <(kubectl completion bash) # setup autocomplete in bash, bash-completion package should be installed first. $ source <(kubectl completion zsh) # setup autocomple…...

题库-JAVASE01

文章目录1.JAVA开发环境2.JAVA变量3.JAVA基本类型4.运算符和表达式5.分支结构6.循环结构7.数组8.方法1.JAVA开发环境 (单选题)在Java中&#xff0c;以下描述错误的是&#xff08; &#xff09; A…class是源文件 B…java是编译前的源文件 C…class是编译后的文件 D.Java程序需…...

Java序列化机制

Java序列化机制 概述 java中的序列化可能都停留在实现Serializable接口上&#xff0c;对于它里面的一些核心机制没有深入了解过。直到最近在项目中踩了一个坑&#xff0c;就是序列化对象添加一个字段以后&#xff0c;使用方系统报了反序列化失败&#xff0c;原因是我们双方的…...

3款强大到离谱电脑软件,都是效率神器,从此远离加班

闲话少说&#xff0c;直接上狠货。 1、ImageGlass ImageGlass是一款值得吹爆的电脑图片浏览工具&#xff0c;使用极其方便&#xff0c;体积50M左右&#xff0c;非常小巧&#xff0c;功能却强大到离谱&#xff0c;ImageGlass打开图片的速度极快&#xff0c;实现快速不同图像间切…...

【项目】Vue3+TS CMS 登录模块搭建

&#x1f4ad;&#x1f4ad; ✨&#xff1a;Vue3 TS   &#x1f49f;&#xff1a;东非不开森的主页   &#x1f49c;: keep going&#x1f49c;&#x1f49c;   &#x1f338;: 如有错误或不足之处&#xff0c;希望可以指正&#xff0c;非常感谢&#x1f609;   Vue3TS一、…...

Java 8 的那些常见写法

前言 现在Java已经发展到Java19版本了&#xff0c;由于Java后面一些版本&#xff0c;就开始商用收费了&#xff0c;所以目前绝大多数公司的JDK版本都是采用的之前稳定且免费的1.8版本&#xff0c;也就是Java8&#xff0c;这个版本已经能满足几乎所有业务的需求开发了&#xff…...

PyQt5数据库开发1 4.3 QSqlTableModel 之 相关槽函数的实现(多图长文详解)

目录 一、打开数据库表 1. 写打开数据库的槽函数 2. 运行后发现数据库可以打开了 3. ODBC配通了&#xff0c;数据库还是打不开 4. 写在tableView上显示数据库表的函数 5. 运行后发现表可以显示了 6. 代码分析 7. 添加列名称 8. 根据内容调整列宽 9. 备注&#xff1a;…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad&#xff08;Adaptive Gradient Algorithm&#xff09;是一种自适应学习率的优化算法&#xff0c;由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率&#xff0c;适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

Oracle查询表空间大小

1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...

如何在看板中体现优先级变化

在看板中有效体现优先级变化的关键措施包括&#xff1a;采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中&#xff0c;设置任务排序规则尤其重要&#xff0c;因为它让看板视觉上直观地体…...

OkHttp 中实现断点续传 demo

在 OkHttp 中实现断点续传主要通过以下步骤完成&#xff0c;核心是利用 HTTP 协议的 Range 请求头指定下载范围&#xff1a; 实现原理 Range 请求头&#xff1a;向服务器请求文件的特定字节范围&#xff08;如 Range: bytes1024-&#xff09; 本地文件记录&#xff1a;保存已…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

python执行测试用例,allure报乱码且未成功生成报告

allure执行测试用例时显示乱码&#xff1a;‘allure’ &#xfffd;&#xfffd;&#xfffd;&#xfffd;&#xfffd;ڲ&#xfffd;&#xfffd;&#xfffd;&#xfffd;ⲿ&#xfffd;&#xfffd;&#xfffd;Ҳ&#xfffd;&#xfffd;&#xfffd;ǿ&#xfffd;&am…...

HDFS分布式存储 zookeeper

hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架&#xff0c;允许使用简单的变成模型跨计算机对大型集群进行分布式处理&#xff08;1.海量的数据存储 2.海量数据的计算&#xff09;Hadoop核心组件 hdfs&#xff08;分布式文件存储系统&#xff09;&a…...

vulnyx Blogger writeup

信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面&#xff0c;gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress&#xff0c;说明目标所使用的cms是wordpress&#xff0c;访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...

基于Springboot+Vue的办公管理系统

角色&#xff1a; 管理员、员工 技术&#xff1a; 后端: SpringBoot, Vue2, MySQL, Mybatis-Plus 前端: Vue2, Element-UI, Axios, Echarts, Vue-Router 核心功能&#xff1a; 该办公管理系统是一个综合性的企业内部管理平台&#xff0c;旨在提升企业运营效率和员工管理水…...