SpringCloud 微服务消息队列灰度方案 (RocketMQ 4.x)
目录
- 背景
- 遇到的问题
- RocketMQ 基础
- 基础消息模型
- 扩展后的消息模型
- 部署模型
- 相关概念点
- 方案对比
- 影子Topic的方案
- Tag的方案
- UserProperty的方案
- 影子Group的方案
- 灰度分区的方案
- 方案对比
- 灰度分区方案设计
- 适配只有部分灰度的情况所做的功能扩展
- 消费者(无灰度)
- 消费者(有灰度)
- 改造流程
- 生产者改造逻辑
- 消费者改造逻辑
- 消费者自定义负载均衡
- MQ注册ClientID修改
- 常量配置类
- 链路信息传递`BasicMDC`类
- MessageStorage类
- 拦截配置
- 消费者拦截
- 生产者拦截
- 请求拦截
- 验证过程
- 消费者(未启动灰度订阅)
- 消费者(灰度订阅)
- 验证消息链路
- 发送端
- 消费端
- 发送灰度消息
背景
- 我们公司团队为了更好地控制版本发布的影响范围,自研了灰度发布流程,目前已经支持
http、gRPC等接口调用方式进行灰度流量转发,但是消息队列基于业务实现展示不支持。 - 参考了网上很多灰度方案博文,比较热门的vivo
鲁班RocketMQ的文章,但是涉及到的改造范围都是比较大,最后在看到关于Asyncer关于灰度分区 的博文,整体比较巧妙的利用MQ的原有机制,改动相对少了很多
遇到的问题

如上图所示普通的业务灰度流程,保证了RPC服务之间的调用灰度,但是当消息从服务发出到消息队列后,队列分区是被均分到正常服务和灰度服务监听的;这样会导致正常服务消费到灰度消息;同时灰度服务消费到正常的消息;所以MQ灰度是目前需要解决的问题,这样才能完成整个灰度链路;
RocketMQ 基础
基础消息模型

扩展后的消息模型

- 相同的ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式,和集群模式(图中是最常用的集群模式)。
- 在集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,如图中 ConsumerGroupA 订阅 TopicA,TopicA 对应 3个队列,则 GroupA 中的 Consumer1 消费的是 MessageQueue 0和 MessageQueue 1的消息,Consumer2是消费的是MessageQueue2的消息。
- 在广播模式下,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列。需要注意的是,广播模式下因为每个 Consumer 实例都需要处理全部的消息,因此这种模式仅推荐在通知推送、配置同步类小流量场景使用。
部署模型
Producer、Consumer又是如何找到Topic和Broker的地址呢?消息的具体发送和接收又是怎么进行的呢?

RocketMQ 部署架构上主要分为四部分:
-
生产者 Producer
- 发布消息的角色。Producer通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。
-
消费者 Consumer
- 消息消费的角色
- 支持以推(push),拉(pull)两种模式对消息进行消费。
- 同时也支持集群方式和广播方式的消费。
- 提供实时消息订阅机制,可以满足大多数用户的需求。
-
名字服务器 NameServer
- NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
- 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
- NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
-
代理服务器 Broker
- Broker主要负责消息的存储、投递和查询以及服务高可用保证。
- NameServer几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker部署相对复杂。
- 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
- Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
- Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。
相关概念点
CommitLog:消息体实际存储的地方,当我们发送的任一业务消息的时候,它最终会存储在commitLog上。MQ在Broker进行集群部署(这里为也简洁,不涉及主从部分)时,同一业务消息只会落到集群的某一个Broker节点上。而这个Broker上的commitLog就会存储所有Topic路由到它的消息,当消息数据量到达1个G后会重新生成一个新的commitLog。Topic:消息主题,表示一类消息的逻辑集合。每条消息只属于一个Topic,Topic中包含多条消息,是MQ进行消息发送订阅的基本单位。属于一级消息类型,偏重于业务逻辑设计。Tag:消息标签,二级消息类型,每一个具体的消息都可以选择性地附带一个Tag,用于区分同一个Topic中的消息类型Queue:实际上Topic更像是一个逻辑概念供我们使用,在源码层级看,Topic以Queue的形式分布在多个Broker上,一个topic往往包含多条Queue消费组及其ID:表示一类Producer或Consumer,这类Producer或Consumer通常生产或消费同应用域的消息,且消息生产与消费的逻辑一致。每个消费组可以定义全局维一的GroupID来标识,由它来代表消费组。不同的消费组在消费时互相隔离,不会影响彼此的消费位点计算。
方案对比
影子Topic的方案
- 新建一系列新的Topic来处理隔离灰度的消息
- 例如对于TOPIC_ORDER会创建TOPIC_ORDER_GRAY来给灰度环境使用。
- 发送方在发送时,对灰度的消息写入到影子Topic中。消费时,灰度环境只使用灰度的分组订阅灰度的Topic。

Tag的方案
- 发送方在发送时,对灰度环境产生的消息的Tag加
灰度标识。 - 消费方,每次灰度版本发布时只订阅
灰度Tag的消息,正常的版本订阅非灰度的Tag。

UserProperty的方案
- 发送方在发送时,对灰度环境产生的消息的UserProperty加灰度标识。
- 消费方的客户端需要进行改写,根据UserProperty来过滤,正常的版本会跳过这类消息,灰度的版本会处理灰度消息。
- 流程与灰度Tag差不错
影子Group的方案
- 发送消息:灰度的服务发送带有灰度标识的消息
- 消费消息:灰度服务只消费灰度标识的消息

灰度分区的方案
- 发送者注册时会检测当前将是不是灰度节点,灰度节点MQ注册的clientId会添加标识
- 消费者订阅时会检测当前节点是不是灰度节点,灰度节点会走灰度的Queue分配策略

方案对比
| 方案 | 优点 | 缺点 | 成本 |
|---|---|---|---|
| 影子Topic的方案 | 使用独立的两个TOPIC,订阅关系一致,改造比较容易 | 有临界问题,灰度切换生产时有可能会漏掉消息;同时需要根据生产、消费者关系维护对应的订阅关系;改造数据需求 | 需要维护两套消费组和TOPC,有维护成本 |
| 影子Group的方案 | 使用独立的两个GROUP,订阅关系一致 | 有临界问题;需要修改生产者、订阅者、DevOps改动范围较大;正常节点和灰度节点都需要知道当前服务的灰度状态,来做出对应的处理 | 需要维护两套GROUP的关系,维护成本高 |
| Tag的方案 | 通过MQ提供的tag机制过滤,可以保证灰度的消息只会被灰度节点消费,改造简单; | 如果Tag参与的业务过滤,不适合该方案;如果没有灰度节点订阅关系不一致,会出现消息丢失 | 无 |
| UserProperty的方案 | 同上 | 同上 | 同上 |
| 灰度分区的方案 | 订阅关系一致,不需要额外维护多一套TOPIC、GROUP; | 无(比较适合公司的一个运营模式) | 无 |
灰度分区方案设计
灰度分区主要基于以下几点:
DevOps:灰度服务的Pod容器内部会有CANARY_RELEASE:true的环境变量。MQ客户端心跳上报:源码中,RocketMQ客户端启动时会想向所有产生订阅关系的broker发送心跳,心跳中带有clientId,该值主要由实例名、容器ip等组成,可以利用canary环境变量做一层额外的注入MQ客户端重平衡:源码中,每隔20秒/客户端上下线,都会触发一次客户端重平衡;我们可以自定义该策略,加入灰度分区平衡逻辑,来实现灰度分区和正常分区的订阅MQ客户端发送方:源码中,RocketMQ发送方每次发送消息都会轮询队列发送,同时加入重试和故障规避的策略,可以通过重写该类来做扩展。
适配只有部分灰度的情况所做的功能扩展
- 基于ThreadLocal来实现消费和发送的链路标识传递
- 拦截HTTP请求:通过拦截http请求识别请求是否灰度,基于ThreadLocal实现线程传递;
- 拦截MQ消息消费:通过拦截消费者消息,识别
UserProperty识别存在灰度标识;更新到ThreadLocal中 - 拦截MQ生产者发送消息:获取
ThreadLocal存储的当前线程的UserProperty标识,写入到发送消息的UserProperty中
- MQ客户端发送方:检测到环境和线程链路变量,做出对应的发送策略;从而来实现
灰度消息通过正常应用后,发送MQ消息能被下级监听的灰度应用接收;
消费者(无灰度)

消费者(有灰度)

改造流程
生产者改造逻辑
无论何时,只要发送方自己当前环境是灰度(CANARY_RELEASE=true)或者当前是灰度链路,则会最后两个分区作为灰度队列,否则选取其他分区发送,根据RocketMQ的源码,自定义发送策略即可实现。
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.latency.LatencyFaultTolerance;
import org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;/*** Custom fault strategy*/
public class CustomMQFaultStrategy {private final InternalLogger log = ClientLogger.getLog();private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();private boolean sendLatencyFaultEnable = false;private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};private final ConcurrentHashMap<TopicPublishInfo, TopicPublishInfoCache> topicPublishInfoCacheTable = new ConcurrentHashMap<>();public long[] getNotAvailableDuration() {return notAvailableDuration;}public void setNotAvailableDuration(final long[] notAvailableDuration) {this.notAvailableDuration = notAvailableDuration;}public long[] getLatencyMax() {return latencyMax;}public void setLatencyMax(final long[] latencyMax) {this.latencyMax = latencyMax;}public boolean isSendLatencyFaultEnable() {return sendLatencyFaultEnable;}public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {this.sendLatencyFaultEnable = sendLatencyFaultEnable;}private TopicPublishInfoCache checkCacheChanged(TopicPublishInfo topicPublishInfo) {if (topicPublishInfoCacheTable.containsKey(topicPublishInfo)) {return topicPublishInfoCacheTable.get(topicPublishInfo);}synchronized (this) {TopicPublishInfoCache cache = new TopicPublishInfoCache();List<MessageQueue> canaryQueues = MessageStorage.getCanaryQueues(topicPublishInfo.getMessageQueueList());List<MessageQueue> normalQueues = MessageStorage.getNormalQueues(topicPublishInfo.getMessageQueueList());Collections.sort(canaryQueues);Collections.sort(normalQueues);cache.setCanaryQueueList(canaryQueues);cache.setNormalQueueList(normalQueues);topicPublishInfoCacheTable.putIfAbsent(topicPublishInfo, cache);}return topicPublishInfoCacheTable.get(topicPublishInfo);}/*** Queue selection strategy*/public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {List<MessageQueue> messageQueueList = tpInfo.getMessageQueueList();TopicPublishInfoCache topicPublishInfoCache = checkCacheChanged(tpInfo);if (MessageStorage.isCanaryRelease() || MessageStorage.isThreadCanaryRelease()) {MessageQueue messageQueue = selectDefaultMessageQueue(tpInfo, lastBrokerName, topicPublishInfoCache.getCanaryQueueList());log.debug("canary context,send message to canary queue:{}", messageQueue.getBrokerName() + messageQueue.getQueueId());return messageQueue;} else {if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();int size = topicPublishInfoCache.getNormalQueueList().size();for (int i = 0; i < size; i++) {int pos = Math.max(Math.abs(index++) % size, 0);MessageQueue mq = topicPublishInfoCache.getNormalQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {return mq;}}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (!topicPublishInfoCache.getCanaryQueueList().contains(mq)) {if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;}} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}}return selectDefaultMessageQueue(tpInfo, lastBrokerName, topicPublishInfoCache.getNormalQueueList());}}/*** Default message queue selection strategy** @param topicPublishInfo* @param lastBrokerName* @param queues* @return {@link MessageQueue }*/private MessageQueue selectDefaultMessageQueue(final TopicPublishInfo topicPublishInfo, final String lastBrokerName,List<MessageQueue> queues) {ThreadLocalIndex sendWhichQueue = topicPublishInfo.getSendWhichQueue();int size = queues.size();if (lastBrokerName != null) {for (int i = 0; i < size; i++) {int index = sendWhichQueue.incrementAndGet();int pos = Math.max(Math.abs(index) % size, 0);MessageQueue mq = queues.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}}int i = sendWhichQueue.incrementAndGet();int res = Math.max(Math.abs(i) % size, 0);log.debug("selectDefaultMessageQueue, lastBrokerName:{}, res:{}", lastBrokerName, topicPublishInfo.getMessageQueueList().get(res));return queues.get(res);}public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}}private long computeNotAvailableDuration(final long currentLatency) {for (int i = latencyMax.length - 1; i >= 0; i--) {if (currentLatency >= latencyMax[i]) {return this.notAvailableDuration[i];}}return 0;}/*** Cache of topic publish info*/private static class TopicPublishInfoCache {/*** Grayscale message queue list*/private List<MessageQueue> canaryQueueList;private List<MessageQueue> normalQueueList;public List<MessageQueue> getCanaryQueueList() {return canaryQueueList;}public void setCanaryQueueList(List<MessageQueue> canaryQueueList) {this.canaryQueueList = canaryQueueList;}public List<MessageQueue> getNormalQueueList() {return normalQueueList;}public void setNormalQueueList(List<MessageQueue> normalQueueList) {this.normalQueueList = normalQueueList;}}
}
消费者改造逻辑
-
其实最大的问题在于消费方如何动态的感知灰度的状态流转,这也是产生之前灰度分区方案的临界问题的根本原因。但是通过源码的深入探索,发现其实我们可以通过改造ClientId和自定义负载均衡策略来实现;
-
RocketMQ客户端启动的时候,会构建本地客户端id(包括实例名、ip名等),然后向broker注册自己。我们可以通过DevOps注入的环境变量CANARY_RELEASE来做改造,即灰度服务clientId后面追加canary表示,default服务后面追加default标识;
消费者自定义负载均衡
import com.alibaba.fastjson.JSON;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;public class CanaryAllocateMessageQueueStrategyImpl implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {log.info("consumerGroup:{} currentCID:{} cidAll:{}",consumerGroup,currentCID, JSON.toJSONString(cidAll));if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return Collections.emptyList();}if (mqAll.stream().anyMatch(mq -> mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {return allocateByAvg(consumerGroup, currentCID, mqAll, cidAll);}if (!MessageStorage.hasCanaryRelease(cidAll)) {List<MessageQueue> allocate = allocateByAvg(consumerGroup, currentCID, mqAll, cidAll);return allocate;}if (MessageStorage.allCanaryRelease(cidAll)) {List<MessageQueue> messageQueues = this.balanceAllocate(consumerGroup, currentCID, mqAll, cidAll);log.info("[canary allocate]: group:{} sub topic:{} has all canary release client,maybe the sub is new,use the default avg strategy.\n" +"current cid:{}\n" +"allocate total {} message queue\n" +"result:\n{}",consumerGroup,mqAll.get(0).getTopic(),messageQueues.size(),currentCID,MessageStorage.joinMessageQueue(messageQueues));return messageQueues;}List<String> canaryCids = MessageStorage.getCanaryCids(cidAll);List<String> normalCids = MessageStorage.getNormalCids(cidAll);List<MessageQueue> canaryQueues = MessageStorage.getCanaryQueues(mqAll);List<MessageQueue> normalQueues = MessageStorage.getNormalQueues(mqAll);Collections.sort(canaryCids);Collections.sort(normalCids);Collections.sort(normalQueues);Collections.sort(canaryQueues);List<MessageQueue> result = null;if (canaryCids.contains(currentCID)) {result = allocateByAvg(consumerGroup, currentCID, canaryQueues, canaryCids);} else {result = allocateByAvg(consumerGroup, currentCID, normalQueues, normalCids);}return result;}/*** @param consumerGroup* @param currentCID* @param mqAll* @param cidAll* @return {@link List }<{@link MessageQueue }>*/private List<MessageQueue> allocateByAvg(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}int index = cidAll.indexOf(currentCID);int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}@Overridepublic String getName() {return "CANARY";}public boolean check(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (StringUtils.isEmpty(currentCID)) {throw new IllegalArgumentException("currentCID is empty");}if (CollectionUtils.isEmpty(mqAll)) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (CollectionUtils.isEmpty(cidAll)) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return false;}return true;}public List<MessageQueue> balanceAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}int index = cidAll.indexOf(currentCID);int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}
}
MQ注册ClientID修改
public String buildMQClientId() {StringBuilder sb = new StringBuilder();sb.append(this.getClientIP());sb.append("@");sb.append(this.getInstanceName());if (!UtilAll.isBlank(this.unitName)) {sb.append("@");sb.append(this.unitName);}//The key is hereif (MessageStorage.isCanaryRelease()) {sb.append(CustomCommonConstant.CANARY_RELEASE_PREFIX);} else {sb.append("@default");}if (this.enableStreamRequestType) {sb.append("@");sb.append(RequestType.STREAM);}return sb.toString();}
常量配置类
public class CustomCommonConstant {/*** Number of grayscale queues*/public static final Integer GRAYSCALE_QUEUE_SIZE = 2;/*** Grayscale client identification*/public static final String CANARY_RELEASE_PREFIX = "@canary";/*** Thread identification*/public static final String THREAD_CANARY_RELEASE = "canary.release";
}
链路信息传递BasicMDC类
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;public class BasicMDC {final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal<>();private static final int WRITE_OPERATION = 1;private static final int MAP_COPY_OPERATION = 2;final ThreadLocal<Integer> lastOperation = new ThreadLocal();private Integer getAndSetLastOperation(int op) {Integer lastOp = (Integer)this.lastOperation.get();this.lastOperation.set(op);return lastOp;}private boolean wasLastOpReadOrNull(Integer lastOp) {return lastOp == null || lastOp == 2;}private Map<String, String> duplicateAndInsertNewMap(Map<String, String> oldMap) {Map<String, String> newMap = Collections.synchronizedMap(new HashMap());if (oldMap != null) {synchronized(oldMap) {newMap.putAll(oldMap);}}this.copyOnThreadLocal.set(newMap);return newMap;}public void put(String key, String val) throws IllegalArgumentException {if (key == null) {throw new IllegalArgumentException("key cannot be null");} else {Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();Integer lastOp = this.getAndSetLastOperation(1);if (!this.wasLastOpReadOrNull(lastOp) && oldMap != null) {oldMap.put(key, val);} else {Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);newMap.put(key, val);}}}public void remove(String key) {if (key != null) {Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();if (oldMap != null) {Integer lastOp = this.getAndSetLastOperation(1);if (this.wasLastOpReadOrNull(lastOp)) {Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);newMap.remove(key);} else {oldMap.remove(key);}}}}public void clear() {this.lastOperation.set(1);this.copyOnThreadLocal.remove();}public String get(String key) {Map<String, String> map = (Map)this.copyOnThreadLocal.get();return map != null && key != null ? (String)map.get(key) : null;}public Map<String, String> getPropertyMap() {this.lastOperation.set(2);return (Map)this.copyOnThreadLocal.get();}public Set<String> getKeys() {Map<String, String> map = this.getPropertyMap();return map != null ? map.keySet() : null;}public Map<String, String> getCopyOfContextMap() {Map<String, String> hashMap = (Map)this.copyOnThreadLocal.get();return hashMap == null ? null : new HashMap(hashMap);}public void setContextMap(Map<String, String> contextMap) {this.lastOperation.set(1);Map<String, String> newMap = Collections.synchronizedMap(new HashMap());newMap.putAll(contextMap);this.copyOnThreadLocal.set(newMap);}
}
MessageStorage类
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;/*** Message storage*/
public class MessageStorage {private static final InternalLogger log = ClientLogger.getLog();private static BasicMDC mdcUtils;static {mdcUtils = new BasicMDC();}/*** Determine whether there is a grayscale client** @param cidAll* @return*/public static boolean hasCanaryRelease(List<String> cidAll) {return !getCanaryCids(cidAll).isEmpty();}/*** Determine if all are grayscale clients** @param cidAll* @return*/public static boolean allCanaryRelease(List<String> cidAll) {List<String> canaryCids = getCanaryCids(cidAll);return canaryCids.size() == cidAll.size();}/*** Connect the message queue into a string** @param messageQueues* @return*/public static String joinMessageQueue(List<MessageQueue> messageQueues) {return messageQueues.stream().map(mq -> mq.getBrokerName() + ":" + mq.getQueueId()).collect(Collectors.joining(", "));}/*** Get the list of grayscale clients** @param cidAll* @return*/public static List<String> getCanaryCids(List<String> cidAll) {return cidAll.stream().filter(cid -> cid.contains(CustomCommonConstant.CANARY_RELEASE_PREFIX)).collect(Collectors.toList());}/*** Get a list of non grayscale clients** @param cidAll* @return*/public static List<String> getNormalCids(List<String> cidAll) {return cidAll.stream().filter(cid -> !cid.contains(CustomCommonConstant.CANARY_RELEASE_PREFIX)).collect(Collectors.toList());}/*** Get the list of grayscale queues** @param mqAll* @return*/public static List<MessageQueue> getCanaryQueues(List<MessageQueue> mqAll) {Collections.sort(mqAll);log.info("topic:{} reBalance, has canary release client, allocate {} message queue by canary release strategy.\n", JSON.toJSONString(mqAll));int size = mqAll.size();if (size > CustomCommonConstant.GRAYSCALE_QUEUE_SIZE) {List<MessageQueue> lastTwo = mqAll.subList(size - CustomCommonConstant.GRAYSCALE_QUEUE_SIZE, size);return new ArrayList<>(lastTwo);} else {return new ArrayList<>();}}/*** Get non grayscale queue list** @param mqAll* @return*/public static List<MessageQueue> getNormalQueues(List<MessageQueue> mqAll) {Collections.sort(mqAll);log.info("topic:{} reBalance, has normal release client, allocate {} message queue by canary release strategy.\n", JSON.toJSONString(mqAll));int size = mqAll.size();if (size > CustomCommonConstant.GRAYSCALE_QUEUE_SIZE) {List<MessageQueue> lastTwo = mqAll.subList(0, size - 2);return new ArrayList<>(lastTwo);} else {return new ArrayList<>(mqAll);}}private static volatile String canaryRelease = null;/*** Determine whether it is a grayscale client** @return*/public static boolean isCanaryRelease() {return Boolean.parseBoolean(getCanaryRelease());}/*** Determine whether it is a grayscale client** @return boolean*/public static boolean isThreadCanaryRelease() {String data = mdcUtils.get(CustomCommonConstant.THREAD_CANARY_RELEASE);return Boolean.parseBoolean(data);}/*** Set MDC** @param key* @param value*/public static void setMDC(String key, String value) {mdcUtils.put(key, value);}/*** @param key* @return {@link String }*/public static String getMdcKey(String key) {return mdcUtils.get(key);}/*** Clear MDC*/public static void clearMDC() {mdcUtils.clear();}/*** Replacement strategy** @return {@link AllocateMessageQueueStrategy }*/public static AllocateMessageQueueStrategy getAllocateMessageQueueAveragely(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
// if (isCanaryRelease()) {
// return new CanaryAllocateMessageQueueStrategyImpl();
// } else {
// return allocateMessageQueueStrategy;
// }return new CanaryAllocateMessageQueueStrategyImpl();}private static String getCanaryRelease() {if (Objects.isNull(canaryRelease)) {synchronized (MessageStorage.class) {if (Objects.nonNull(canaryRelease)) {return canaryRelease;}//CANARY_RELEASEString tmpCanaryRelease = System.getProperty("canary.release");if (Objects.isNull(tmpCanaryRelease)) {tmpCanaryRelease = "false";}canaryRelease = tmpCanaryRelease;return canaryRelease;}}return canaryRelease;}
}
拦截配置
消费者拦截
@Component
@Aspect
public class ConsumerAOP {@Pointcut("execution(public * org.apache.rocketmq.client.consumer.listener.*.*(..))")public void aspectConsumer() {}@Before("aspectConsumer()")public void doBefore(JoinPoint joinPoint) {Object[] args = joinPoint.getArgs();for (Object arg : args) {if (arg instanceof List<?>) {List<MessageExt> messageExtList = (List<MessageExt>) arg;for (MessageExt messageExt : messageExtList) {String threadCanaryRelease = messageExt.getProperty(CustomCommonConstant.THREAD_CANARY_RELEASE);if(StringUtils.isEmpty(threadCanaryRelease)){threadCanaryRelease = "false";}MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);}}}}
}
@Component
@Aspect
public class ListenerAOP {@Pointcut("execution(public * org.apache.rocketmq.spring.core.*.*(..))")public void aspectListener(){}@Before("aspectListener()")public void doBefore(JoinPoint joinPoint){Object[] args = joinPoint.getArgs();for (Object arg : args) {if (arg instanceof MessageExt){MessageExt messageExt = (MessageExt) arg;String threadCanaryRelease = messageExt.getProperty(CustomCommonConstant.THREAD_CANARY_RELEASE);if(StringUtils.isEmpty(threadCanaryRelease)){threadCanaryRelease = "false";}MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);}}}
}
生产者拦截
@Component
@Aspect
public class ProducerAOP {@Pointcut("execution(public * org.apache.rocketmq.client.producer.*.*(..))")public void aspectProducer() {}@Before("aspectProducer()")public void doBefore(JoinPoint joinPoint) {Object[] args = joinPoint.getArgs();for (Object arg : args) {if (arg instanceof Message) {Message message = (Message) arg;String threadCanaryRelease = MessageStorage.getMdcKey(CustomCommonConstant.THREAD_CANARY_RELEASE);if(StringUtils.isEmpty(threadCanaryRelease)){threadCanaryRelease = "false";}message.putUserProperty(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);}}}
}
请求拦截
@Slf4j
@Component
@WebFilter(urlPatterns = "/*", filterName = "requestFilter")
@Order(Integer.MIN_VALUE)
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ForwardFilter implements Filter {@SneakyThrows@Overridepublic void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) {HttpServletRequest req = (HttpServletRequest) servletRequest;String threadCanaryRelease = req.getHeader("THREAD_CANARY_RELEASE");log.info("attributeNames:{}", JSON.toJSONString(req.getHeaderNames()));if(Objects.nonNull(threadCanaryRelease)){MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, "true");}filterChain.doFilter(servletRequest, servletResponse);}
}
验证过程
消费者(未启动灰度订阅)

消费者(灰度订阅)

验证消息链路
发送端
@Slf4j
@RestController
@RequestMapping("/producer")
public class ProducerMessageContrlller {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@PostMapping("/send")public void send(){boolean canaryRelease = MessageStorage.isCanaryRelease();String body = String.format("发送消息,环境:%s", canaryRelease ? "灰度" : "正式");
// for (Integer i = 0; i < 100; i++) {log.info("发送消息,环境:{}", canaryRelease ? "灰度" : "正式");rocketMQTemplate.convertAndSend("TEST-DATA-MSG", body);
// }}
}
消费端
@Slf4j
@Service
@RocketMQMessageListener(topic = "TEST-DATA-MSG", consumerGroup = "test-consumer-group")
public class MyConsumer1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {boolean canaryRelease = MessageStorage.isCanaryRelease();Map<String, String> properties = message.getProperties();log.info("received message: 【{}】 环境:【{}】 配置参数:【{}】 ", new String(message.getBody()),canaryRelease ? "灰度" : "正式",JSON.toJSONString(properties));}
}
发送灰度消息



相关文章:
SpringCloud 微服务消息队列灰度方案 (RocketMQ 4.x)
目录 背景遇到的问题 RocketMQ 基础基础消息模型扩展后的消息模型部署模型相关概念点 方案对比影子Topic的方案Tag的方案UserProperty的方案影子Group的方案灰度分区的方案方案对比 灰度分区方案设计适配只有部分灰度的情况所做的功能扩展消费者(无灰度)…...
厘清标准差和标准误:因果推断的统计学基础
标准差,指 一次抽样中 个体取值间的离散程度,反映了 个体取值对样本均值的代表性。 标准误,指 多次抽样中 样本均值间的离散程度,反映了 样本均值对总体均值的代表性。 公众号原文-厘清标准差和标准误:因果推断的统计…...
GESP4级考试语法知识(贪心算法(二))
排队接水2代码: #include<iostream> #include<cstdio> #include<algorithm> using namespace std; struct people {int num;int time; }; people s[1001]; int n,r,a[1001]; double sum,ave; bool cmp(people x,people y) {return x.time<y.t…...
MATLAB 使用教程 —— 命令窗口输入命令,工作区显示变量
命令在命令窗口输入变量在工作区显示 MATLAB 桌面包含的面板如下: 当前文件夹 - 此面板允许访问项目文件夹和文件。命令窗口 - 这是主要区域,用户在命令行中输入命令,命令提示符(>>).工作区 - 工作区显示所有变量,无论是创…...
LeetCode 热题100(八)【二叉树】(3)
目录 8.11二叉树展开为链表(中等) 8.12从前序与中序遍历序列构造二叉树(中等) 8.13路径总和III(中等) 8.14二叉树的最近公共祖先(中等) 8.15二叉树中的最大路径和(困…...
uniapp h5实现录音
使用npm安装 npm install recorder-core引入Recorder库 可以使用import、require、html script等你适合的方式来引入js文件,下面的以import为主要参考,其他引入方式根据文件路径自行调整一下就可以了。 //必须引入的Recorder核心(文件路径是…...
字节跳动Android面试题汇总及参考答案(80+面试题,持续更新)
Android 四大组件是什么? Android 四大组件分别是 Activity、Service、Broadcast Receiver 和 Content Provider。 Activity 是 Android 应用中最基本的组件,用于实现用户界面。它可以包含各种视图控件,如按钮、文本框等。一个 Activity 通常对应一个屏幕的内容。用户可以通…...
【go从零单排】通道select、通道timeout、Non-Blocking Channel Operations非阻塞通道操作
🌈Don’t worry , just coding! 内耗与overthinking只会削弱你的精力,虚度你的光阴,每天迈出一小步,回头时发现已经走了很远。 📗概念 select 语句是 Go 的一种控制结构,用于等待多个通道操作。它类似于 s…...
PSRR仿真笔记
1.首先打开bandgap的testbench电路,选择schematic 2.打开电路后,选择VDD模块,然后按键盘Q,进行编辑,将AC magnitude改为1 V 3.修改完成后,点击左上角Launch > ADE Explorer 4.在出现的窗口中,选择Creat…...
AUTOSAR_EXP_ARAComAPI的7章笔记(3)
☞返回总目录 相关总结:AutoSar AP简单多绑定总结 7.3 多绑定 如在 5.4.3 小节中简要讨论的,某个代理类 / 骨架类的不同实例之间的技术传输是不同的,多绑定描述了这种情况的解决方案。多种技术原因都可能导致这种情况出现: 代…...
WSADATA 关键字详细介绍
WSADATA 是 Windows Sockets API(Winsock)中用于存储 Winsock 库的初始化信息的结构体。在使用 Winsock API 之前,必须通过调用 WSAStartup() 函数进行初始化,WSADATA 结构体用于接收有关 Winsock 库版本的信息。Winsock 是 Windo…...
Day44 | 动态规划 :状态机DP 买卖股票的最佳时机IV买卖股票的最佳时机III
Day44 | 动态规划 :状态机DP 买卖股票的最佳时机IV&&买卖股票的最佳时机III&&309.买卖股票的最佳时机含冷冻期 动态规划应该如何学习?-CSDN博客 本次题解参考自灵神的做法,大家也多多支持灵神的题解 买卖股票的最佳时机【…...
Area-Composition模型部署指南
一、介绍 本模型可以通过输入不同的提示词,然后根据各部分提示词进行融合生成图片。如下图: 此图像包含 4 个不同的区域:夜晚、傍晚、白天、早晨 二、部署 环境要求: 最低显存:10G 1. 部署ComfyUI 本篇的模型部署…...
策略模式、状态机详细解读
策略模式 (Strategy Pattern) 策略模式 (Strategy Pattern) 是一种行为型设计模式,旨在将一组算法封装成独立的类,使得它们可以相互替换。这种模式让算法的变化不会影响到使用算法的客户,减少了类之间的耦合。策略模式通常用于处理一类问题&…...
OpenWrt广播DNS到客户端
OpenWrt广播DNS到客户端 Network -> Interfaces -> lan ->DHCP Server -> Advanced Settings -> DHCP-Options 设置 6,dns1,dns2 如下图 也可以直接编辑 /etc/config/dhcp config dhcp lan list dhcp_option 6,119.29.29.29,223.5.5.5 #ipv4 option dns 2402:4…...
C++编程技巧与规范-类和对象
类和对象 1. 静态对象的探讨与全局对象的构造顺序 静态对象的探讨 类中的静态成员变量(类类型静态成员) 类中静态变量的声明与定义(类中声明类外定义) #include<iostream> using namespace std;namespace _nmspl {class A{public:A():m_i(5){…...
AutoHotKey自动热键AHK-正则表达式
在这个软件的操作中,基本都是需要即时的解决一些问题,所以对字符串的操作是比较多的,所以正则的使用还是比较重要的,接下来我们用一个例子来了解正则表达式的使用 str "7654321" RegExMatch(str, "65(43)(21)", SubPat)str ( str %str% SubPat %SubPa…...
【3D Slicer】的小白入门使用指南四
开源解剖影像浏览工具Open Anatomy Browser使用及介绍 和3D slicer米有太大关系,该工具是网页版影像数据的浏览工具(可以简单理解为网页版的3D slicer) 介绍 ● 开放解剖(OA)浏览器是由神经影像分析中心开发的,基于网络浏览器技术构建的图谱查看器。 ● OA浏览器将解剖模…...
flink同步mysql数据表到pg库
1.关闭防火墙和selinux systemctl stop firewalld systemctl disable firewalld systemctl status firewalldvi /etc/selinux/config 修改为disabled2.安装java8 yum list java-1.8* yum install java-1.8.0-openjdk* -yjava -version3.下载和部署postgresql 下载地址&#…...
AndroidStudio-常用布局
一、线性布局LinearLayout 线性布局内部的各视图有两种排列方式: 1.orientation属性值为horizontal时,内部视图在水平方向从左往右排列。 2.orientation属性值为vertical时,内部视图在垂直方向从上往下排列。 如果不指定orientation属性,…...
铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...
使用VSCode开发Django指南
使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架,专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用,其中包含三个使用通用基本模板的页面。在此…...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...
解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八
现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet,点击确认后如下提示 最终上报fail 解决方法 内核升级导致,需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...
屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...
Linux云原生安全:零信任架构与机密计算
Linux云原生安全:零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言:云原生安全的范式革命 随着云原生技术的普及,安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测,到2025年,零信任架构将成为超…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
前端开发面试题总结-JavaScript篇(一)
文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包(Closure)?闭包有什么应用场景和潜在问题?2.解释 JavaScript 的作用域链(Scope Chain) 二、原型与继承3.原型链是什么?如何实现继承&a…...
