【RocketMQ 生产者和消费者】- 生产者启动源码-上报生产者和消费者心跳信息到 broker(3)
文章目录
- 1. 前言
- 2. sendHeartbeatToAllBrokerWithLock 上报心跳信息
- 3. prepareHeartbeatData 准备心跳数据
- 4. sendHearbeat 发送心跳上报请求
- 5. broker 处理心跳请求
- 5.1 heartBeat 处理心跳包
- 5.2 createTopicInSendMessageBackMethod 创建重传 topic
- 5.3 registerConsumer 注册消费者
- 5.3.1 updateChannel 更新消费者连接通道
- 5.3.2 updateSubscription 更新订阅消息
- 5.3.3 通知消费者重平衡
- 5.3.4 处理消费者注册事件
- 5.3.4.1 register 注册消费者过滤信息
- 5.3.4.2 register 创建 FilterDataMapByTopic 对象
- 5.3.4.3 register 注册过滤信息
- 6. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
- 【RocketMQ 生产者和消费者】- 生产者启动源码-启动流程(1)
- 【RocketMQ 生产者和消费者】- 生产者启动源码-创建 MQClientInstance(2)
上两篇文章中我们探讨了生产者的启动流程,这篇文章就来看下生产者启动之后如何发送心跳信息到 broker。
2. sendHeartbeatToAllBrokerWithLock 上报心跳信息
这个方法就是用于上报生产者的心跳信息到所有的 broker,不过可以注意到这个方法是在 MQClientInstance 里面的,上一篇文章我们就说了 MQClientInstance 是生产者和消费者的公共类,所以这个方法上报心跳的时候会把消费者和生产者的信息都一起上报到 broker 中,当然这里的生产者和消费者是指同一个进程的。
/*** 发送心跳信息到所有 broker*/
private void sendHeartbeatToAllBroker() {// 心跳数据,这里是先准备一个心跳包final HeartbeatData heartbeatData = this.prepareHeartbeatData();final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();// 如果没有任何生产者和消费者的数据,就不需要发送心跳包if (producerEmpty && consumerEmpty) {log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);return;}// brokerAddrTable 不为空if (!this.brokerAddrTable.isEmpty()) {// 发送心跳的次数 + 1long times = this.sendHeartbeatTimesTotal.getAndIncrement();// 遍历所有 brokerIterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, HashMap<Long, String>> entry = it.next();// 获取 broker 名字String brokerName = entry.getKey();// 这里的 oneTable 是指 id -> address, 每一个 broker 都需要记录下来这个 broker 所在的集群的其他 broker 地址// oneTable 的 key 就是 id,0 表示主节点,其他数字表示从节点,value 就是节点的地址HashMap<Long, String> oneTable = entry.getValue();if (oneTable != null) {for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {Long id = entry1.getKey();String addr = entry1.getValue();if (addr != null) {// 这里就是如果消费者配置为空,并且当前的 broker 节点不是 Master 节点,就没必要注册到从节点上面了if (consumerEmpty) {if (id != MixAll.MASTER_ID)// producer 生产者只需要和从节点发送心跳即可, 但是如果也有消费者,那么也可以往从节点continue;}try {// 这里就是把心跳包发送给 brokerint version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());if (!this.brokerVersionTable.containsKey(brokerName)) {this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));}// 将版本设置到 brokerVersionTable 中// brokerName -> (address, version) 的映射关系this.brokerVersionTable.get(brokerName).put(addr, version);// 每发送 20 次心跳就打印一次日志if (times % 20 == 0) {log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);log.info(heartbeatData.toString());}} catch (Exception e) {if (this.isBrokerInNameServer(addr)) {log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);} else {log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,id, addr, e);}}}}}}}
}
上面就是这个方法的源码,首先就通过 prepareHeartbeatData
准备一个心跳包,如果说心跳包里面的生产者和消费者都为空,就说明不需要发送心跳信息,直接返回。
如果不为空,需要发送心跳信息,需要遍历所有 broker,由于 broker 集群是以 brokerName 为标记,所以会遍历 brokerAddrTable
集合中的所有 key(brokerName),然后处理 value,value 是 HashMap<Long, String> 类型,Long 是 brokerId,String 是这个 broker 的地址,意思就是 brokerName 集群下面的主从节点。
遍历 broker 时会判断如果心跳包里面的消费者配置为空,并且当前的 broker 节点不是 Master 节点,就没必要注册到从节点上面了,生产者只需要负责和主节点建立心跳,因为生产者生产消息都是直接存储到主节点的,从节点负责同步。
接下来调用 sendHearbeat
把心跳包发送给 broker,返回 broker 记录的心跳信息的版本,然后存储到本地缓存 brokerVersionTable
中。
/*** broker 信息版本, brokerName 集群 -> (address -> 版本)*/
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =new ConcurrentHashMap<String, HashMap<String, Integer>>();
3. prepareHeartbeatData 准备心跳数据
/*** 准备心跳数据包* @return*/
private HeartbeatData prepareHeartbeatData() {// 心跳包HeartbeatData heartbeatData = new HeartbeatData();// 客户端 IDheartbeatData.setClientID(this.clientId);// 遍历所有消费者for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {// 构建 ConsumerData 数据ConsumerData consumerData = new ConsumerData();// 消费者组consumerData.setGroupName(impl.groupName());// 消费类型: PULL 和 PUSHconsumerData.setConsumeType(impl.consumeType());// 消费模式: CLUSTERING 和 BROADCASTING, 也就是集群和广播consumerData.setMessageModel(impl.messageModel());// 消费者启动的时候从哪里开始消费, 在 ConsumeFromWhere 这个类中consumerData.setConsumeFromWhere(impl.consumeFromWhere());// 订阅信息,包括过滤消息相关标签、SQL规则consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());consumerData.setUnitMode(impl.isUnitMode());// 加入消费者的心跳数据集合中heartbeatData.getConsumerDataSet().add(consumerData);}}// 生产者的心跳for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {MQProducerInner impl = entry.getValue();if (impl != null) {// 生产者组名ProducerData producerData = new ProducerData();producerData.setGroupName(entry.getKey());// 添加到生产者集合里面heartbeatData.getProducerDataSet().add(producerData);}}return heartbeatData;
}
对于消费者要设置的心跳信息为:
- groupName:消费者组
- consumeType:消费类型,PULL 和 PUSH
- messageModel:消费模式,CLUSTERING 和 BROADCASTING, 也就是集群和广播
- consumeFromWhere:消费者启动的时候从哪里开始消费, 在 ConsumeFromWhere 这个类中
- subscriptionDataSet:消费者组订阅信息,包括过滤消息相关标签、SQL规则
- unitMode
对于生产者要设置的心跳信息为:
- groupName:生产者组
4. sendHearbeat 发送心跳上报请求
/*** 发送心跳包给 broker 节点* @param addr* @param heartbeatData* @param timeoutMillis* @return* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException*/
public int sendHearbeat(final String addr,final HeartbeatData heartbeatData,final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {// 构建心跳请求,请求编码是 HEART_BEATRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);// 请求语言,默认是 JAVArequest.setLanguage(clientConfig.getLanguage());// 请求体,就是心跳包request.setBody(heartbeatData.encode());// 这里就是通过 Netty 发送心跳包了RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {// 返回结果,返回版本号return response.getVersion();}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
这个方法就是向 broker 发送心跳信息,可以看到发送之前在请求里面设置了一些属性:
- 请求语言,默认是 JAVA
- 心跳包,也就是上面的生产者和消费者
- 请求 Code 是 HEART_BEAT
最后请求是使用同步发送的,如果发送成功就返回 broker 返回的版本。
5. broker 处理心跳请求
broker 通过 ClientManageProcessor 处理器来处理心跳请求,处理的方法就是 heartBeat
。
5.1 heartBeat 处理心跳包
/*** 处理客户端心跳请求* @param ctx* @param request* @return*/
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {// 构建响应命令对象RemotingCommand response = RemotingCommand.createResponseCommand(null);// 解码,获取心跳包数据HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);// 构建客户端连接信息对象ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion());// 处理心跳包中的消费者信息for (ConsumerData data : heartbeatData.getConsumerDataSet()) {// 从 broker 的缓存中找出当前消费者组的订阅组配置SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());// 当 Consumer 发生变化的时候是否需要通知组内其他的 Consumerboolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {// 从配置中获取 isNotifyConsumerIdsChangedEnable,默认就是 trueisNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();int topicSysFlag = 0;if (data.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 消息消费的重试队列,%RETRY%groupNameString newTopic = MixAll.getRetryTopic(data.getGroupName());// 创建消息消费重试队列this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);}// 注册消费者boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);if (changed) {log.info("registerConsumer info changed {} {}",data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}}// 注册生产者for (ProducerData data : heartbeatData.getProducerDataSet()) {this.brokerController.getProducerManager().registerProducer(data.getGroupName(),clientChannelInfo);}// 设置返回结果response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;
}
这个方法会分别处理消费者和生产者的心跳信息,对于生产者比较简单,就是调用 registerProducer
注册生产者即可,但是对于消费者就需要给消费者组建立一个重传队列,然后再注册消费者。
重试队列就是消费者消费消息失败的时候会把消息发送到这个队列进行重试,重试队列是以消费者组为维度的,也就是说消费者重传是以消费者组为维度的,一个消费者组里面的消费者共享一个重传队列。
5.2 createTopicInSendMessageBackMethod 创建重传 topic
/*** 创建重传 topic,持久化到配置文件中,文件地址 ${user.home}/store/config/topics.json* @param topic // 待创建的 topic* @param clientDefaultTopicQueueNums // topic 下面的默认队列数, 默认 1* @param perm // 队列权限, 默认读写都有* @param topicSysFlag // topic 标识* @return*/
public TopicConfig createTopicInSendMessageBackMethod(final String topic,final int clientDefaultTopicQueueNums,final int perm,final int topicSysFlag) {// 从 topic 配置中获取下这个 topicTopicConfig topicConfig = this.topicConfigTable.get(topic);// 如果已经存在配置了,那么直接返回if (topicConfig != null)return topicConfig;boolean createNew = false;try {// 新建的时候需要加锁,防止创建相同的 topic 互相覆盖if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {// 再次获取 topic 信息,双重检查锁topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;// 这里就是新建配置topicConfig = new TopicConfig(topic);// 重传队列的读写队列数都是 1topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);// 权限默认是读写topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);log.info("create new topic {}", topicConfig);// 添加到 topicConfigTable 集合里面this.topicConfigTable.put(topic, topicConfig);createNew = true;// 获取下一个版本,这个版本用于标识当前 topicConfigTable 有没有发生变化,比如在从节点同步 topicConfigTable 的// 时候就可以使用版本和本地存储的版本进行队列,如果发生了变化再重新写入文件中this.dataVersion.nextVersion();// 持久化到文件中,文件地址: ${user.home}/store/config/topics.jsonthis.persist();} finally {// 解锁this.topicConfigTableLock.unlock();}}} catch (InterruptedException e) {log.error("createTopicInSendMessageBackMethod exception", e);}// 如果创建了新的 topicif (createNew) {// 注册 broker 信息this.brokerController.registerBrokerAll(false, true, true);}return topicConfig;
}
创建重传 topic 时首先从 broker 的本地缓存 topicConfigTable 中获取 topic 的配置,然后如果已经存在 topic 配置了,就直接返回,否则才去创建。
在新建 topic 的时候需要加锁,防止并发创建 topic,可以看到重传 topic 设置的属性如下:
- 读写队列数都是 1
- 权限默认是可读写
- topic 系统标识
创建出来之后设置到本地缓存 topicConfigTable
中,就代表创建成功了,然后更新版本 dataVersion
,接着持久化到文件中,文件地址: ${user.home}/store/config/topics.json,更新版本就是因为 broker 需要向 NameServer 注册 topic 配置信息,把版本也传过去标识 broker 心跳版本,可以说 broker 的心跳就是 topic 配置信息。
最后如果创建了新的 topic,就会像 NameServer 注册 topic 信息,registerBrokerAll 的逻辑可以看这篇文章:【RocketMQ Broker 相关源码】-注册 broker 信息到所有的 NameServer。
5.3 registerConsumer 注册消费者
/*** 注册消费者* @param group 消费者组* @param clientChannelInfo 客户端连接通道* @param consumeType 消费类型(PULL 或者 PUSH)* @param messageModel 消费模式(集群或者广播)* @param consumeFromWhere 消费点位* @param subList 消费者组订阅信息, 一个消费者组里面的消费者可以订阅多个 topic 下面的消息* @param isNotifyConsumerIdsChangedEnable 消费者组里面的消费者发生变化时是否需要通知其他消费者重平衡* @return*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {// 获取消费者组信息ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null == consumerGroupInfo) {// 创建一个新的 ConsumerGroupInfoConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);// 如果不存在才新增ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);consumerGroupInfo = prev != null ? prev : tmp;}// 是否在这个消费者组里面新增了消费者连接boolean r1 =consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);// 是否更新了消费者组订阅信息boolean r2 = consumerGroupInfo.updateSubscription(subList);// 如果发生了变更if (r1 || r2) {if (isNotifyConsumerIdsChangedEnable) {// 通知消费者重平衡this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}// 处理消费者注册事件this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);// 是否发生了变更return r1 || r2;
}
注册消费者需要处理两件事,首先就是注册消费者连接到 channelInfoTable
集合中,channelInfoTable
集合用于管理消费者组下面的连接信息,key 是连接,value 是 ClientChannelInfo,是连接信息。
// 消费者组下面的连接信息
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =new ConcurrentHashMap<Channel, ClientChannelInfo>(16);public class ClientChannelInfo {// 连接通道private final Channel channel;// 客户端 IDprivate final String clientId;// 一般是 JAVAprivate final LanguageCode language;// 版本private final int version;// 上一次上报心跳的事件private volatile long lastUpdateTimestamp = System.currentTimeMillis();...
}
然后就是更新消费者组的订阅信息,就是本地 subscriptionTable
集合。
5.3.1 updateChannel 更新消费者连接通道
/*** 更新消费者连接* @param infoNew 客户端连接通道* @param consumeType 消费类型(PULL 或者 PUSH)* @param messageModel 消费模式(集群或者广播)* @param consumeFromWhere 消费点位* @return*/
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {boolean updated = false;this.consumeType = consumeType;this.messageModel = messageModel;this.consumeFromWhere = consumeFromWhere;// 获取原来的连接ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());if (null == infoOld) {// 如果不存在就新增ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);if (null == prev) {log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,messageModel, infoNew.toString());// 标记设置为 trueupdated = true;}infoOld = infoNew;} else {// 如果已存在就判断需不需要更新if (!infoOld.getClientId().equals(infoNew.getClientId())) {// 这里就是出现 BUG 了, 因为正常来说一个连接通道和一个 clientId 对应, 记录下日志log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",this.groupName,infoOld.toString(),infoNew.toString());// 重新修正 channelInfoTable 里面的映射关系this.channelInfoTable.put(infoNew.getChannel(), infoNew);}}// 更新 lastUpdateTimestamp 属性, 表示当前消费者组上一次上报心跳的时间this.lastUpdateTimestamp = System.currentTimeMillis();infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);return updated;
}
更新连接通道的逻辑就是更新 channelInfoTable 集合,然后更新下 lastUpdateTimestamp,因为这个方法是在 ConsumerGroupInfo 中的,所以更新的 lastUpdateTimestamp 就代表这个消费者组上一次上报心跳时间。更新这个属性是因为 broker 有一个定时任务
5.3.2 updateSubscription 更新订阅消息
public boolean updateSubscription(final Set<SubscriptionData> subList) {boolean updated = false;// 1. 遍历所有订阅信息, 将 subList 设置到 subscriptionTable 中for (SubscriptionData sub : subList) {// 获取原来的订阅信息SubscriptionData old = this.subscriptionTable.get(sub.getTopic());if (old == null) {// 如果获取不到就新建SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);if (null == prev) {updated = true;log.info("subscription changed, add new topic, group: {} {}",this.groupName,sub.toString());}// 如果当前新增的订阅信息版本比原来的要大且消费者类型是 PUSH} else if (sub.getSubVersion() > old.getSubVersion()) {if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {log.info("subscription changed, group: {} OLD: {} NEW: {}",this.groupName,old.toString(),sub.toString());}// 更新订阅信息集合, 这种情况下关系到消费者拉取消息就要更新, 如果是 PULL 类型由于是用户控制就不需要更新this.subscriptionTable.put(sub.getTopic(), sub);}}// 2. 删除不存在的订阅信息Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();while (it.hasNext()) {// 遍历所有订阅信息Entry<String, SubscriptionData> next = it.next();String oldTopic = next.getKey();boolean exist = false;for (SubscriptionData sub : subList) {if (sub.getTopic().equals(oldTopic)) {// 如果 subscriptionTable 里面存储的 topic 订阅信息不在 subList 集合中, 说明消费者没有上报过来exist = true;break;}}if (!exist) {// 如果不存在, 说明这个订阅信息被修改了log.warn("subscription changed, group: {} remove topic {} {}",this.groupName,oldTopic,next.getValue().toString());// 删掉这个订阅信息it.remove();updated = true;}}// 设置上报心跳的时间this.lastUpdateTimestamp = System.currentTimeMillis();return updated;
}
更新订阅消息流程分为两大步,首先是遍历所有订阅信息, 将消费者上报的 subList 设置到 subscriptionTable 中,如果 broker 中没有存储这个订阅消息,又或者存储的这个订阅信息版本过期了,就会更新到 subscriptionTable 中。
然后就是删除不存在的订阅信息,比如原来 subscriptionTable 存在 topicA -> subA
和 topicB -> subB
订阅信息,而消费者上报过来的消费者组订阅信息是 topicA -> subA
,那么 topicB -> subB
就会被删掉,这也透露一件事,就是消费者组里面的消费者订阅关系需要一直,具体可以看官方的这篇文章:RocketMQ 订阅关系。
5.3.3 通知消费者重平衡
当消费者组里面新增了消费者又或者消费者的订阅关系发生了变化,比如又订阅了多一个 topic,这种情况下会通过 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel())
通知消费者进行重平衡。当然这个方法在 【RocketMQ Broker 相关源码】- NettyRemotingClient 和 NettyRemotingServer 这篇文章的 3.3.1 有讲解,所以这里不再多说。
5.3.4 处理消费者注册事件
同样的注册事件也是通过 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel())
来处理,而这里的注册主要指的是消费者过滤信息的注册,就是 SQL92 过滤信息的注册。
可以看到就是通过 register 方法去注册过滤信息,下面就从这个方法入手。
5.3.4.1 register 注册消费者过滤信息
/*** 注册消费者组过滤信息* @param consumerGroup* @param subList*/
public void register(final String consumerGroup, final Collection<SubscriptionData> subList) {for (SubscriptionData subscriptionData : subList) {// 一个一个注册register(subscriptionData.getTopic(),consumerGroup,subscriptionData.getSubString(),subscriptionData.getExpressionType(),subscriptionData.getSubVersion());}// 获取消费者组过滤信息Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup);Iterator<ConsumerFilterData> iterator = groupFilterData.iterator();// 遍历旧的过滤信息while (iterator.hasNext()) {ConsumerFilterData filterData = iterator.next();boolean exist = false;for (SubscriptionData subscriptionData : subList) {// 现在新上报的订阅信息是已经存在的if (subscriptionData.getTopic().equals(filterData.getTopic())) {// 信息存在, 不需要删除exist = true;break;}}// 如果这个过滤信息已经不再上报了, 将原来的过滤信息过期事件设置为当前时间, 相当于懒删除了if (!exist && !filterData.isDead()) {filterData.setDeadTime(System.currentTimeMillis());// 日志输出log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);}}
}/*** 获取消费者组的过滤信息* @param consumerGroup* @return*/
public Collection<ConsumerFilterData> getByGroup(final String consumerGroup) {Collection<ConsumerFilterData> ret = new HashSet<ConsumerFilterData>();// 遍历 filterDataByTopicIterator<FilterDataMapByTopic> topicIterator = this.filterDataByTopic.values().iterator();while (topicIterator.hasNext()) {FilterDataMapByTopic filterDataMapByTopic = topicIterator.next();// 获取 topic 下面的消费者过滤信息Iterator<ConsumerFilterData> filterDataIterator = filterDataMapByTopic.getGroupFilterData().values().iterator();// 遍历这些过滤信息while (filterDataIterator.hasNext()) {ConsumerFilterData filterData = filterDataIterator.next();// 如果跟参数消费者组一样, 说明这个 topic 是这个消费者组需要消费的if (filterData.getConsumerGroup().equals(consumerGroup)) {// 添加到返回集合中ret.add(filterData);}}}return ret;
}
客户端上报过来的心跳信息里面的消费者订阅信息包括了消费者的一些过滤信息,之前也说过了消费者组里面的消费者订阅关系需要保持一致,所以消费者上报过来的订阅信息就可以认为是这个消费者组的订阅信息,因此可以看到遍历 subList
一个一个注册之后,需要将旧的过滤信息里面没有上报过来的删掉,不过这里的删掉是懒删除,只是设置下 ConsumerFilterData#deadTime
为当前时间,就表示这个过滤信息已经过期了。
而 getByGroup
就是获取这个消费者组的消费者过滤信息,获取的逻辑是遍历 filterDataByTopic
集合来获取。filterDataByTopic 以 topic 为 key,因为一个 topic 可以被多个消费者组下面的消费者去消费,所以 value 是 FilterDataMapByTopic
对象,这个对象里面的属性是一个 ConcurrentMap 集合和 topic,集合的 key 是消费者组,value 是消费者组的过滤信息。
/*** 消费者过滤信息, 一个 topic 可以被多个消费者组下面的消费者去消费, 所以这里是以 topic 为 key*/
private ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>filterDataByTopic = new ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>(256);public static class FilterDataMapByTopic {/*** 消费者组的过滤信息*/private ConcurrentMap<String/*consumer group*/, ConsumerFilterData>groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();private String topic;...
}
getByGroup 的遍历逻辑就是遍历所有 topic 下面的 FilterDataMapByTopic,然后继续遍历 FilterDataMapByTopic 里面的 groupFilterData 属性,判断这个 topic 是否被这个消费者组消费,逻辑并不复杂,就是里面的集合有点绕。
5.3.4.2 register 创建 FilterDataMapByTopic 对象
继续回到 register 方法,这个方法就是负责创建出 FilterDataMapByTopic,上面我们也说了 filterDataByTopic 以 FilterDataMapByTopic 为 value,所以这个方法主要还是创建出 FilterDataMapByTopic 对象。
/*** 注册 SQL92 信息到消费者组集合 filterDataByTopic 中* @param topic* @param consumerGroup* @param expression* @param type* @param clientVersion* @return*/
public boolean register(final String topic, final String consumerGroup, final String expression,final String type, final long clientVersion) {// 如果是 TAG 类型的过滤信息, 直接返回if (ExpressionType.isTagType(type)) {return false;}// 如果 SQL92 过滤表达式为空, 直接返回if (expression == null || expression.length() == 0) {return false;}// 获取 topic 下面的过滤信息集合, 注意这里就是消费者组下面一个 topic 一个过滤信息集合FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);if (filterDataMapByTopic == null) {// 不存在就新建一个FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);filterDataMapByTopic = prev != null ? prev : temp;}// 计算出 consumerGroup#topic 的布隆过滤器信息// 1.经过 k 次 hash 求出来的 k 位// 2.布隆过滤器总共多少位BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);// 注册过滤信息到 filterDataMapByTopic 中return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
}
register 方法会在一开始就判断如果过滤类型是 TAG 就不需要注册,那就是说明 filterDataByTopic
这个集合 只存储 SQL92 过滤类型的过滤数据。
这个方法逻辑不多,值得关注的是 bloomFilter.generate
,可以看到对传入的 consumerGroup#topic
进行 hash 之后生成布隆过滤器信息,下面就看下这个布隆过滤器信息包括什么。
在讲解 generate 方法前,我们来想一下布隆过滤器有什么重要信息:
- hash 函数个数 k
- bit 数组长度 m
- 误差率 fpp
对于布隆过滤器,如果要往里面设置一个字符串,就需要先通过 k 个哈希函数求出 k 个位,然后将布隆过滤器的这 k 个位设置为1,如果感兴趣可以看这篇文章:详细说说布隆过滤器 BloomFilter。
对于消费者过滤信息,创建的 BloomFilterData 包括两个重要信息:这 k 个 hash 函数求出来的 bit 数组(长度为 k)
和 bit 数组长度 m
。
5.3.4.3 register 注册过滤信息
上面创建好布隆过滤器信息之后,最终调用 register 注册过滤信息,而注册过滤信息主要就是往 groupFilterData 集合设置,上面也说了,groupFilterData 集合存储的是 consumerGroup -> ConsumerFilterData
的映射关系。
/*** 注册过滤信息* @param consumerGroup 消费者组* @param expression SQL92 过滤表达式* @param type 过滤类型, SQL92* @param bloomFilterData 布隆过滤器数据* @param clientVersion 订阅信息版本* @return*/
public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData,long clientVersion) {// 获取旧的过滤信息ConsumerFilterData old = this.groupFilterData.get(consumerGroup);if (old == null) {// 创建一个新的 ConsumerFilterDataConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {return false;}// 设置布隆过滤器信息consumerFilterData.setBloomFilterData(bloomFilterData);// 添加到 groupFilterData 中old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);if (old == null) {// 这里就是新增成功, 直接返回log.info("New consumer filter registered: {}", consumerFilterData);return true;} else {// 并发添加了, 注意 FilterDataMapByTopic 是以 topic 为维度的, 一个 topic 可以被多个消费者组消费if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {// 打印下日志, 意思就是并发添加的这两过滤信息还不一样log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);}// 版本一样, 另一个线程添加的过滤信息过期了, 就直接设置下过期时间为 0, 重新启用if (clientVersion == old.getClientVersion() && old.isDead()) {reAlive(old);return true;}// 版本一样同时另一个线程添加的没有过期或者当前线程版本比较低, 当前线程就注册失败了return false;} else {// 这里就是新增的版本比原来的要高, 直接覆盖原来的this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);return true;}}} else {// 原来过滤信息已经存在了且当前添加的版本 <= 原来的if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {// 打印下日志, 意思就是过滤表达式不一样log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);}// 如果跟原来的版本一样就直接重新启用, 逻辑和上面的一样if (clientVersion == old.getClientVersion() && old.isDead()) {reAlive(old);return true;}// 添加的版本比原来的版本还要低或者相同但是原来的没有过期return false;}// 这里就是添加的版本比原来的高, 但是还是得看下过滤信息是否有变化, 首先是过滤表达式和类型boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);// 然后是布隆过滤器信息是否发生了变化if (old.getBloomFilterData() == null && bloomFilterData != null) {change = true;}if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {change = true;}// 过滤表达式发生了变化if (change) {// 创建新的过滤表达式ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {// new expression compile error, remove old, let client report error.// 这里就是过滤表达式有问题, 将旧的删掉, 然后直接返回 false 表示注册失败, 所以说如果表达式有问题也会把旧的删掉this.groupFilterData.remove(consumerGroup);return false;}// 设置布隆过滤器数据consumerFilterData.setBloomFilterData(bloomFilterData);// 创建成功, 添加到 groupFilterData 中this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("Consumer filter info change, old: {}, new: {}, change: {}",old, consumerFilterData, change);return true;} else {// 过滤表达式没有发生变化, 直接设置版本号old.setClientVersion(clientVersion);// 如果旧的已经过期了, 重新启用if (old.isDead()) {reAlive(old);}return true;}}
}protected void reAlive(ConsumerFilterData filterData) {// 重新设置过期时间为 0long oldDeadTime = filterData.getDeadTime();filterData.setDeadTime(0);log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime);
}
里面的注释都写得比较清楚,也可以看到这里面维护 groupFilterData 集合是根据两种情况来维护,一种是原来没有,一种是原来有的,如果是原来集合里面就有 group -> ConsumerFilterData
的映射,就需要判断当前上报的过滤信息版本和原来的对比,如果版本不是最新的就会被覆盖,一般来说如果过滤信息不变版本都不会变。
然后在这个方法中也可以看到当 old == null
的时候应该是为了防止并发,使用 putIfAbsent
添加成功之后返回 old,还要判断这个 old 是不是已经存在了,如果存在还需要对比添加的版本来决定留下哪个。
最后注意下里面的 reAlive 方法,这个方法就是将一个过期的过滤信息变可用,过滤信息 ConsumerFilterData
里面有一个属性 deadTime
表示过期时间,如果设置成不为 0 就表示过期了,所以这个 reAlive 方法就是重新将这个属性设置为 0,表示复用里面的过滤信息。
6. 小结
好了,这篇文章中我们探讨了上报心跳信息到 broker 的源码,可以看到上传的心跳信息里面不单单包括生产者,也包括消费者的,所以这个方法是生产者和消费者的共用方法。
而 broker 处理心跳的时候对于消费者不单单要处理消费者连接信息,同时也要处理消费者组订阅消息以及 SQL92 过滤信息,也就是上面的 5.3 小节。
如有错误,欢迎指出!!!!
相关文章:

【RocketMQ 生产者和消费者】- 生产者启动源码-上报生产者和消费者心跳信息到 broker(3)
文章目录 1. 前言2. sendHeartbeatToAllBrokerWithLock 上报心跳信息3. prepareHeartbeatData 准备心跳数据4. sendHearbeat 发送心跳上报请求5. broker 处理心跳请求5.1 heartBeat 处理心跳包5.2 createTopicInSendMessageBackMethod 创建重传 topic5.3 registerConsumer 注册…...

Python----循环神经网络(Word2Vec的优化)
一、负采样 基本思想: 在训练过程中,对于每个正样本(中心词和真实上下文词组成的词对),随机采样少量(如5-20个)负样本(中心词与非上下文词组成的词对)。 模型通过区分正…...

Simon J.D. Prince《Understanding Deep Learning》
学习神经网络和深度学习推荐这本书,这本书站位非常高,且很多问题都深入剖析了,甩其他同类书籍几条街。 多数书,不深度分析、没有知识体系,知识点零散、章节之间孤立。还有一些人Tian所谓的权威,醒醒吧。 …...

开搞:第四个微信小程序:图上县志
原因:我换了一个微信号来搞,因为用同一个用户,备案只能一个个的来。这样不行。所以我换了一个。原来注册过小程序。现在修改即可。注意做好计划后,速度备案和审核,不然你时间浪费不起。30元花起。 结构: -…...
模型评估与调优(PyTorch)
文章目录 模型评估方法混淆矩阵混淆矩阵中的指标ROC曲线(受试者工作特征)AUCR平方残差均方误差(MSE)均方根误差(RMSE)平均绝对误差(MAE) 模型调优方法交叉验证(CV&#x…...
sockaddr结构体详解
在网络编程中,sockaddr 结构体用于表示套接字的地址信息。由于不同协议(如 IPv4、IPv6、Unix 域套接字)的地址格式不同,实际使用中通常通过以下三种变体结构来处理不同类型的地址: 1. 通用地址结构:struct …...

Seata源码—7.Seata TCC模式的事务处理一
大纲 1.Seata TCC分布式事务案例配置 2.Seata TCC案例服务提供者启动分析 3.TwoPhaseBusinessAction注解扫描源码 4.Seata TCC案例分布式事务入口分析 5.TCC核心注解扫描与代理创建入口源码 6.TCC动态代理拦截器TccActionInterceptor 7.Action拦截处理器ActionIntercept…...

【语法】C++的map/set
目录 平衡二叉搜索树 set insert() find() erase() swap() map insert() 迭代器 erase() operator[] multiset和multimap 在之前学习的STL中,string,vector,list,deque,array都是序列式容器,它们的…...
【FAQ】HarmonyOS SDK 闭源开放能力 —Live View Kit (3)
1.问题描述: 通过Push Kit创建实况窗之后,再更新实况窗失败,平台查询提示“实况窗端更新失败,通知未创建或已经过期”。 解决方案: 通过Push Kit更新实况窗内容的过程是自动更新的。客户端在创建本地实况窗后&#…...

vue vite textarea标签按下Shift+Enter 换行输入,只按Enter则提交的实现思路
注意input标签不能实现,需要用textarea标签 直接看代码 <template><textareav-model"message"keydown.enter"handleEnter"placeholder"ShiftEnter 换行,Enter 提交"></textarea> </template>&l…...
MySQL多线程备份工具mysqlpump详解!
MySQLPUMP备份工具详解 1. 概述 MySQLPump 是 MySQL 5.7 引入的一个客户端备份工具,用于替代传统的 mysqldump 工具。它提供了并行处理、进度状态显示、更好的压缩支持等新特性,能够更高效地执行 MySQL 数据库备份操作。 2. 主要特性 并行处理&#x…...
创建信任所有证书的HttpClient:Java 实现 HTTPS 接口调用,等效于curl -k
在 Java 生态中,HttpClient 和 Feign 都是调用第三方接口的常用工具,但它们的定位、设计理念和使用场景有显著差异。以下是详细对比: DIFF1. 定位与抽象层级 特性HttpClientFeign层级底层 HTTP 客户端库(处理原始请求/响应&#…...
Redisson分布式集合原理及应用
Redisson是一个用于Redis的Java客户端,它简化了复杂的数据结构和分布式服务的使用。 适用场景对比 数据结构适用场景优点RList消息队列、任务队列、历史记录分布式共享、阻塞操作、分页查询RMap缓存、配置中心、键值关联数据支持键值对、分布式事务、TTLRSet去重集…...

深入理解 PlaNet(Deep Planning Network):基于python从零实现
引言:基于模型的强化学习与潜在动态 基于模型的强化学习(Model-based Reinforcement Learning)旨在通过学习环境动态的模型来提高样本效率。这个模型可以用来进行规划,让智能体在不需要与真实环境进行每一次决策交互的情况下&…...
精益数据分析(75/126):用户反馈的科学解读与试验驱动迭代——Rally的双向验证方法论
精益数据分析(75/126):用户反馈的科学解读与试验驱动迭代——Rally的双向验证方法论 在创业的黏性阶段,用户反馈是优化产品的重要依据,但如何避免被表面反馈误导?如何将反馈转化为可落地的迭代策略&#x…...

仿腾讯会议——视频发送接收
1、 添加音频模块 2、刷新图片,触发重绘 3、 等比例缩放视频帧 4、 新建视频对象 5、在中介者内定义发送视频帧的函数 6、完成发送视频的函数 7、 完成开启/关闭视频 8、绑定视频的信号槽函数 9、 完成开启/关闭视频 10、 完成发送视频 11、 完成刷新图片显示 12、完…...

从3.7V/5V到7.4V,FP6291在应急供电智能门锁中的应用
在智能家居蓬勃发展的当下,智能门锁以其便捷、安全的特性,成为现代家庭安防的重要组成部分。在智能门锁电量耗尽的情况下,应急电源外接移动电源(USB5V输入) FP6291升压到7.4V供电可应急开锁。增强用户在锁具的安全性、…...
java后端-海外登录(谷歌/FaceBook)
前言 由于最近公司的项目要在海外运行,因此需要对接海外的登录,目前就是谷歌和facebook两种,后面支付也是需要的,后续再进行书写 谷歌登录 这个相对比较容易,而且只提供给安卓即可,废话就不多说了,直接贴解决方案 引入maven依赖 <dependency> <groupId>com.go…...

【人工智障生成日记1】从零开始训练本地小语言模型
🎯 从零开始训练本地小语言模型:MiniGPT TinyStories(4090Ti) 🧭 项目背景 本项目旨在以学习为目的,从头构建一个完整的本地语言模型训练管线。目标是: ✅ 不依赖外部云计算✅ 完全本地运行…...

Selenium-Java版(frame切换/窗口切换)
frame切换/窗口切换 前言 切换到frame 原因 解决 切换回原来的主html 切换到新的窗口 问题 解决 回到原窗口 法一 法二 示例 前言 参考教程:Python Selenium Web自动化 2024版 - 自动化测试 爬虫_哔哩哔哩_bilibili 上期文章:Sel…...

一文深度解析:Pump 与 PumpSwap 的协议机制与技术差异
在 Solana 链上,Pump.fun 和其延伸产品 PumpSwap 构成了 meme coin 发行与流通的两大核心场景。从初期的游戏化发行模型,到后续的自动迁移与交易市场,Pump 系列协议正在推动 meme coin 从“爆发性投机”走向“协议化运营”。本文将从底层逻辑…...

星云智控v1.0.0产品发布会圆满举行:以创新技术重构物联网监控新生态
星云智控v1.0.0产品发布会圆满举行:以创新技术重构物联网监控新生态 2024年5月15日,成都双流蛟龙社区党群服务中心迎来了一场备受业界瞩目的发布会——优雅草科技旗下”星云智控v1.0.0”物联网AI智控系统正式发布。本次发布会吸引了包括沃尔沃集团、新希…...

SpringBoot(一)--- Maven基础
目录 前言 一、初始Maven 1.依赖管理 2.项目构建 3.统一项目结构 二、IDEA集成Maven 1.Maven安装 2.创建Maven项目 2.1全局设置 2.2 创建SpringBoot项目 2.3 常见问题 三、单元测试 1.JUnit入门 2.断言 前言 Maven 是一款用于管理和构建Java项目的工具ÿ…...

基于FPGA控制电容阵列与最小反射算法的差分探头优化设计
在现代高速数字系统测试中,差分探头的信号完整性直接影响测量精度。传统探头存在阻抗失配导致的信号反射问题,本文提出一种通过FPGA动态控制电容阵列,结合最小反射算法的优化方案,可实时调整探头等效容抗,将信号反射损…...

kakfa 基本了解
部署结构 Kafka 使用zookeeper来协商和同步,但是kafka 从版本3.5正式开始deprecate zookeeper, 同时推荐使用自带的 kraft. 而从4.0 开始则不再支持 zookeeper。 所以 kafka 是有control plane 和 data plane 的。 data plane 就是broker,control plane…...
基于Browser Use + Playwright 实现AI Agent操作Web UI自动化
Browser Use是什么 Browser Use是一个开源项目官网:Browser Use - Enable AI to control your browser,专为大语言模型(LLM)设计的只能浏览器工具,能够让AI像人类一样自然的浏览和操作网页,支持多标签页管…...

Origin绘制多因子柱状点线图
多因子柱状点线图是一种结合柱状图和点线图的复合图表,常用于同时展示多个因子(变量)在不同分组下的分布和趋势变化。 适用场景: (1)比较多个因子在不同分组中的数值大小(柱状图)&a…...

Web漏洞扫描服务的特点与优势:守护数字时代的安全防线
在数字化浪潮中,Web应用程序的安全性已成为企业业务连续性和用户信任的核心要素。随着网络攻击手段的不断升级,Web漏洞扫描服务作为一种主动防御工具,逐渐成为企业安全体系的标配。本文将从特点与优势两方面,解析其价值与应用场景…...
iOS 直播技术及优化
iOS直播技术的实现和优化涉及多个技术环节,需结合协议选择、编解码方案、播放器技术及性能调优等多方面。 一、核心技术实现 协议选择与传输优化 HLS(HTTP Live Streaming):苹果官方推荐,基于HTTP分片传输,…...

抛弃传统P2P技术,EasyRTC音视频基于WebRTC打造教育/会议/远程巡检等场景实时通信解决方案
一、方案背景 随着网络通信发展,实时音视频需求激增。传统服务器中转方式延迟高、资源消耗大,WebP2P技术由此兴起。EasyRTC作为高性能实时通信平台,集成WebP2P技术,实现低延迟、高效率音视频通信,广泛应用于教育、医疗…...