当前位置: 首页 > news >正文

SpringCloud 微服务消息队列灰度方案 (RocketMQ 4.x)

目录

    • 背景
      • 遇到的问题
    • RocketMQ 基础
      • 基础消息模型
      • 扩展后的消息模型
      • 部署模型
      • 相关概念点
    • 方案对比
      • 影子Topic的方案
      • Tag的方案
      • UserProperty的方案
      • 影子Group的方案
      • 灰度分区的方案
      • 方案对比
    • 灰度分区方案设计
      • 适配只有部分灰度的情况所做的功能扩展
      • 消费者(无灰度)
      • 消费者(有灰度)
    • 改造流程
      • 生产者改造逻辑
      • 消费者改造逻辑
        • 消费者自定义负载均衡
      • MQ注册ClientID修改
      • 常量配置类
      • 链路信息传递`BasicMDC`类
      • MessageStorage类
      • 拦截配置
        • 消费者拦截
        • 生产者拦截
        • 请求拦截
    • 验证过程
      • 消费者(未启动灰度订阅)
      • 消费者(灰度订阅)
      • 验证消息链路
        • 发送端
        • 消费端
        • 发送灰度消息

背景

  • 我们公司团队为了更好地控制版本发布的影响范围,自研了灰度发布流程,目前已经支持httpgRPC等接口调用方式进行灰度流量转发,但是消息队列基于业务实现展示不支持。
  • 参考了网上很多灰度方案博文,比较热门的vivo鲁班RocketMQ的文章,但是涉及到的改造范围都是比较大,最后在看到关于Asyncer关于灰度分区 的博文,整体比较巧妙的利用MQ的原有机制,改动相对少了很多

遇到的问题

在这里插入图片描述
如上图所示普通的业务灰度流程,保证了RPC服务之间的调用灰度,但是当消息从服务发出到消息队列后,队列分区是被均分到正常服务灰度服务监听的;这样会导致正常服务消费到灰度消息;同时灰度服务消费到正常的消息;所以MQ灰度是目前需要解决的问题,这样才能完成整个灰度链路;

RocketMQ 基础

基础消息模型

在这里插入图片描述

扩展后的消息模型

在这里插入图片描述

  • 相同的ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式,和集群模式(图中是最常用的集群模式)。
  • 在集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,如图中 ConsumerGroupA 订阅 TopicA,TopicA 对应 3个队列,则 GroupA 中的 Consumer1 消费的是 MessageQueue 0和 MessageQueue 1的消息,Consumer2是消费的是MessageQueue2的消息。
  • 在广播模式下,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列。需要注意的是,广播模式下因为每个 Consumer 实例都需要处理全部的消息,因此这种模式仅推荐在通知推送、配置同步类小流量场景使用。

部署模型

Producer、Consumer又是如何找到Topic和Broker的地址呢?消息的具体发送和接收又是怎么进行的呢?

在这里插入图片描述

RocketMQ 部署架构上主要分为四部分:

  • 生产者 Producer

    • 发布消息的角色。Producer通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。
  • 消费者 Consumer

    • 消息消费的角色
    • 支持以推(push),拉(pull)两种模式对消息进行消费。
    • 同时也支持集群方式和广播方式的消费。
    • 提供实时消息订阅机制,可以满足大多数用户的需求。
  • 名字服务器 NameServer

    • NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
      • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
      • 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
  • 代理服务器 Broker

    • Broker主要负责消息的存储、投递和查询以及服务高可用保证。
    • NameServer几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker部署相对复杂。
      • 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
      • Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
      • Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。

相关概念点

  • CommitLog:消息体实际存储的地方,当我们发送的任一业务消息的时候,它最终会存储在commitLog上。MQ在Broker进行集群部署(这里为也简洁,不涉及主从部分)时,同一业务消息只会落到集群的某一个Broker节点上。而这个Broker上的commitLog就会存储所有Topic路由到它的消息,当消息数据量到达1个G后会重新生成一个新的commitLog。
  • Topic:消息主题,表示一类消息的逻辑集合。每条消息只属于一个Topic,Topic中包含多条消息,是MQ进行消息发送订阅的基本单位。属于一级消息类型,偏重于业务逻辑设计。
  • Tag:消息标签,二级消息类型,每一个具体的消息都可以选择性地附带一个Tag,用于区分同一个Topic中的消息类型
  • Queue:实际上Topic更像是一个逻辑概念供我们使用,在源码层级看,Topic以Queue的形式分布在多个Broker上,一个topic往往包含多条Queue
  • 消费组及其ID:表示一类Producer或Consumer,这类Producer或Consumer通常生产或消费同应用域的消息,且消息生产与消费的逻辑一致。每个消费组可以定义全局维一的GroupID来标识,由它来代表消费组。不同的消费组在消费时互相隔离,不会影响彼此的消费位点计算。

方案对比

影子Topic的方案

  • 新建一系列新的Topic来处理隔离灰度的消息
    • 例如对于TOPIC_ORDER会创建TOPIC_ORDER_GRAY来给灰度环境使用。
  • 发送方在发送时,对灰度的消息写入到影子Topic中。消费时,灰度环境只使用灰度的分组订阅灰度的Topic。

在这里插入图片描述

Tag的方案

  • 发送方在发送时,对灰度环境产生的消息的Tag加灰度标识
  • 消费方,每次灰度版本发布时只订阅灰度Tag的消息,正常的版本订阅非灰度的Tag。

在这里插入图片描述

UserProperty的方案

  • 发送方在发送时,对灰度环境产生的消息的UserProperty加灰度标识。
  • 消费方的客户端需要进行改写,根据UserProperty来过滤,正常的版本会跳过这类消息,灰度的版本会处理灰度消息。
  • 流程与灰度Tag差不错

影子Group的方案

  • 发送消息:灰度的服务发送带有灰度标识的消息
  • 消费消息:灰度服务只消费灰度标识的消息
    在这里插入图片描述

灰度分区的方案

  • 发送者注册时会检测当前将是不是灰度节点,灰度节点MQ注册的clientId会添加标识
  • 消费者订阅时会检测当前节点是不是灰度节点,灰度节点会走灰度的Queue分配策略

在这里插入图片描述

方案对比

方案优点缺点成本
影子Topic的方案使用独立的两个TOPIC,订阅关系一致,改造比较容易有临界问题,灰度切换生产时有可能会漏掉消息;同时需要根据生产、消费者关系维护对应的订阅关系;改造数据需求需要维护两套消费组和TOPC,有维护成本
影子Group的方案使用独立的两个GROUP,订阅关系一致有临界问题;需要修改生产者、订阅者、DevOps改动范围较大;正常节点和灰度节点都需要知道当前服务的灰度状态,来做出对应的处理需要维护两套GROUP的关系,维护成本高
Tag的方案通过MQ提供的tag机制过滤,可以保证灰度的消息只会被灰度节点消费,改造简单;如果Tag参与的业务过滤,不适合该方案;如果没有灰度节点订阅关系不一致,会出现消息丢失
UserProperty的方案同上同上同上
灰度分区的方案订阅关系一致,不需要额外维护多一套TOPIC、GROUP;无(比较适合公司的一个运营模式)

灰度分区方案设计

灰度分区主要基于以下几点:

  • DevOps:灰度服务的Pod容器内部会有 CANARY_RELEASEtrue 的环境变量。
  • MQ客户端心跳上报:源码中,RocketMQ客户端启动时会想向所有产生订阅关系的broker发送心跳,心跳中带有clientId,该值主要由实例名、容器ip等组成,可以利用canary环境变量做一层额外的注入
  • MQ客户端重平衡:源码中,每隔20秒/客户端上下线,都会触发一次客户端重平衡;我们可以自定义该策略,加入灰度分区平衡逻辑,来实现灰度分区和正常分区的订阅
  • MQ客户端发送方:源码中,RocketMQ发送方每次发送消息都会轮询队列发送,同时加入重试和故障规避的策略,可以通过重写该类来做扩展。

适配只有部分灰度的情况所做的功能扩展

  • 基于ThreadLocal来实现消费和发送的链路标识传递
    • 拦截HTTP请求:通过拦截http请求识别请求是否灰度,基于ThreadLocal实现线程传递;
    • 拦截MQ消息消费:通过拦截消费者消息,识别UserProperty识别存在灰度标识;更新到ThreadLocal中
    • 拦截MQ生产者发送消息:获取ThreadLocal存储的当前线程的UserProperty标识,写入到发送消息的UserProperty
  • MQ客户端发送方:检测到环境和线程链路变量,做出对应的发送策略;从而来实现灰度消息通过正常应用后,发送MQ消息能被下级监听的灰度应用接收

消费者(无灰度)

在这里插入图片描述

消费者(有灰度)

在这里插入图片描述

改造流程

生产者改造逻辑

无论何时,只要发送方自己当前环境是灰度(CANARY_RELEASE=true)或者当前是灰度链路,则会最后两个分区作为灰度队列,否则选取其他分区发送,根据RocketMQ的源码,自定义发送策略即可实现。


import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.latency.LatencyFaultTolerance;
import org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;/*** Custom fault strategy*/
public class CustomMQFaultStrategy {private final InternalLogger log = ClientLogger.getLog();private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();private boolean sendLatencyFaultEnable = false;private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};private final ConcurrentHashMap<TopicPublishInfo, TopicPublishInfoCache> topicPublishInfoCacheTable = new ConcurrentHashMap<>();public long[] getNotAvailableDuration() {return notAvailableDuration;}public void setNotAvailableDuration(final long[] notAvailableDuration) {this.notAvailableDuration = notAvailableDuration;}public long[] getLatencyMax() {return latencyMax;}public void setLatencyMax(final long[] latencyMax) {this.latencyMax = latencyMax;}public boolean isSendLatencyFaultEnable() {return sendLatencyFaultEnable;}public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {this.sendLatencyFaultEnable = sendLatencyFaultEnable;}private TopicPublishInfoCache checkCacheChanged(TopicPublishInfo topicPublishInfo) {if (topicPublishInfoCacheTable.containsKey(topicPublishInfo)) {return topicPublishInfoCacheTable.get(topicPublishInfo);}synchronized (this) {TopicPublishInfoCache cache = new TopicPublishInfoCache();List<MessageQueue> canaryQueues = MessageStorage.getCanaryQueues(topicPublishInfo.getMessageQueueList());List<MessageQueue> normalQueues = MessageStorage.getNormalQueues(topicPublishInfo.getMessageQueueList());Collections.sort(canaryQueues);Collections.sort(normalQueues);cache.setCanaryQueueList(canaryQueues);cache.setNormalQueueList(normalQueues);topicPublishInfoCacheTable.putIfAbsent(topicPublishInfo, cache);}return topicPublishInfoCacheTable.get(topicPublishInfo);}/*** Queue selection strategy*/public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {List<MessageQueue> messageQueueList = tpInfo.getMessageQueueList();TopicPublishInfoCache topicPublishInfoCache = checkCacheChanged(tpInfo);if (MessageStorage.isCanaryRelease() || MessageStorage.isThreadCanaryRelease()) {MessageQueue messageQueue = selectDefaultMessageQueue(tpInfo, lastBrokerName, topicPublishInfoCache.getCanaryQueueList());log.debug("canary context,send message to canary queue:{}", messageQueue.getBrokerName() + messageQueue.getQueueId());return messageQueue;} else {if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();int size = topicPublishInfoCache.getNormalQueueList().size();for (int i = 0; i < size; i++) {int pos = Math.max(Math.abs(index++) % size, 0);MessageQueue mq = topicPublishInfoCache.getNormalQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {return mq;}}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (!topicPublishInfoCache.getCanaryQueueList().contains(mq)) {if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;}} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}}return selectDefaultMessageQueue(tpInfo, lastBrokerName, topicPublishInfoCache.getNormalQueueList());}}/*** Default message queue selection strategy** @param topicPublishInfo* @param lastBrokerName* @param queues* @return {@link MessageQueue }*/private MessageQueue selectDefaultMessageQueue(final TopicPublishInfo topicPublishInfo, final String lastBrokerName,List<MessageQueue> queues) {ThreadLocalIndex sendWhichQueue = topicPublishInfo.getSendWhichQueue();int size = queues.size();if (lastBrokerName != null) {for (int i = 0; i < size; i++) {int index = sendWhichQueue.incrementAndGet();int pos = Math.max(Math.abs(index) % size, 0);MessageQueue mq = queues.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}}int i = sendWhichQueue.incrementAndGet();int res = Math.max(Math.abs(i) % size, 0);log.debug("selectDefaultMessageQueue, lastBrokerName:{}, res:{}", lastBrokerName, topicPublishInfo.getMessageQueueList().get(res));return queues.get(res);}public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}}private long computeNotAvailableDuration(final long currentLatency) {for (int i = latencyMax.length - 1; i >= 0; i--) {if (currentLatency >= latencyMax[i]) {return this.notAvailableDuration[i];}}return 0;}/*** Cache of topic publish info*/private static class TopicPublishInfoCache {/*** Grayscale message queue list*/private List<MessageQueue> canaryQueueList;private List<MessageQueue> normalQueueList;public List<MessageQueue> getCanaryQueueList() {return canaryQueueList;}public void setCanaryQueueList(List<MessageQueue> canaryQueueList) {this.canaryQueueList = canaryQueueList;}public List<MessageQueue> getNormalQueueList() {return normalQueueList;}public void setNormalQueueList(List<MessageQueue> normalQueueList) {this.normalQueueList = normalQueueList;}}
}

消费者改造逻辑

  • 其实最大的问题在于消费方如何动态的感知灰度的状态流转,这也是产生之前灰度分区方案的临界问题的根本原因。但是通过源码的深入探索,发现其实我们可以通过改造ClientId和自定义负载均衡策略来实现;

  • RocketMQ客户端启动的时候,会构建本地客户端id(包括实例名、ip名等),然后向broker注册自己。我们可以通过DevOps注入的环境变量CANARY_RELEASE来做改造,即灰度服务clientId后面追加canary表示,default服务后面追加default标识;

消费者自定义负载均衡

import com.alibaba.fastjson.JSON;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;public class CanaryAllocateMessageQueueStrategyImpl implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {log.info("consumerGroup:{} currentCID:{} cidAll:{}",consumerGroup,currentCID, JSON.toJSONString(cidAll));if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return Collections.emptyList();}if (mqAll.stream().anyMatch(mq -> mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {return allocateByAvg(consumerGroup, currentCID, mqAll, cidAll);}if (!MessageStorage.hasCanaryRelease(cidAll)) {List<MessageQueue> allocate = allocateByAvg(consumerGroup, currentCID, mqAll, cidAll);return allocate;}if (MessageStorage.allCanaryRelease(cidAll)) {List<MessageQueue> messageQueues = this.balanceAllocate(consumerGroup, currentCID, mqAll, cidAll);log.info("[canary allocate]: group:{} sub topic:{} has all canary release client,maybe the sub is new,use the default avg strategy.\n" +"current cid:{}\n" +"allocate total {} message queue\n" +"result:\n{}",consumerGroup,mqAll.get(0).getTopic(),messageQueues.size(),currentCID,MessageStorage.joinMessageQueue(messageQueues));return messageQueues;}List<String> canaryCids = MessageStorage.getCanaryCids(cidAll);List<String> normalCids = MessageStorage.getNormalCids(cidAll);List<MessageQueue> canaryQueues = MessageStorage.getCanaryQueues(mqAll);List<MessageQueue> normalQueues = MessageStorage.getNormalQueues(mqAll);Collections.sort(canaryCids);Collections.sort(normalCids);Collections.sort(normalQueues);Collections.sort(canaryQueues);List<MessageQueue> result = null;if (canaryCids.contains(currentCID)) {result = allocateByAvg(consumerGroup, currentCID, canaryQueues, canaryCids);} else {result = allocateByAvg(consumerGroup, currentCID, normalQueues, normalCids);}return result;}/*** @param consumerGroup* @param currentCID* @param mqAll* @param cidAll* @return {@link List }<{@link MessageQueue }>*/private List<MessageQueue> allocateByAvg(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}int index = cidAll.indexOf(currentCID);int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}@Overridepublic String getName() {return "CANARY";}public boolean check(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (StringUtils.isEmpty(currentCID)) {throw new IllegalArgumentException("currentCID is empty");}if (CollectionUtils.isEmpty(mqAll)) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (CollectionUtils.isEmpty(cidAll)) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return false;}return true;}public List<MessageQueue> balanceAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}int index = cidAll.indexOf(currentCID);int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}
}

MQ注册ClientID修改

    public String buildMQClientId() {StringBuilder sb = new StringBuilder();sb.append(this.getClientIP());sb.append("@");sb.append(this.getInstanceName());if (!UtilAll.isBlank(this.unitName)) {sb.append("@");sb.append(this.unitName);}//The key is hereif (MessageStorage.isCanaryRelease()) {sb.append(CustomCommonConstant.CANARY_RELEASE_PREFIX);} else {sb.append("@default");}if (this.enableStreamRequestType) {sb.append("@");sb.append(RequestType.STREAM);}return sb.toString();}

常量配置类

public class CustomCommonConstant {/*** Number of grayscale queues*/public static final Integer GRAYSCALE_QUEUE_SIZE = 2;/*** Grayscale client identification*/public static final String CANARY_RELEASE_PREFIX = "@canary";/*** Thread identification*/public static final String THREAD_CANARY_RELEASE = "canary.release";
}

链路信息传递BasicMDC


import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;public class BasicMDC {final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal<>();private static final int WRITE_OPERATION = 1;private static final int MAP_COPY_OPERATION = 2;final ThreadLocal<Integer> lastOperation = new ThreadLocal();private Integer getAndSetLastOperation(int op) {Integer lastOp = (Integer)this.lastOperation.get();this.lastOperation.set(op);return lastOp;}private boolean wasLastOpReadOrNull(Integer lastOp) {return lastOp == null || lastOp == 2;}private Map<String, String> duplicateAndInsertNewMap(Map<String, String> oldMap) {Map<String, String> newMap = Collections.synchronizedMap(new HashMap());if (oldMap != null) {synchronized(oldMap) {newMap.putAll(oldMap);}}this.copyOnThreadLocal.set(newMap);return newMap;}public void put(String key, String val) throws IllegalArgumentException {if (key == null) {throw new IllegalArgumentException("key cannot be null");} else {Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();Integer lastOp = this.getAndSetLastOperation(1);if (!this.wasLastOpReadOrNull(lastOp) && oldMap != null) {oldMap.put(key, val);} else {Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);newMap.put(key, val);}}}public void remove(String key) {if (key != null) {Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();if (oldMap != null) {Integer lastOp = this.getAndSetLastOperation(1);if (this.wasLastOpReadOrNull(lastOp)) {Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);newMap.remove(key);} else {oldMap.remove(key);}}}}public void clear() {this.lastOperation.set(1);this.copyOnThreadLocal.remove();}public String get(String key) {Map<String, String> map = (Map)this.copyOnThreadLocal.get();return map != null && key != null ? (String)map.get(key) : null;}public Map<String, String> getPropertyMap() {this.lastOperation.set(2);return (Map)this.copyOnThreadLocal.get();}public Set<String> getKeys() {Map<String, String> map = this.getPropertyMap();return map != null ? map.keySet() : null;}public Map<String, String> getCopyOfContextMap() {Map<String, String> hashMap = (Map)this.copyOnThreadLocal.get();return hashMap == null ? null : new HashMap(hashMap);}public void setContextMap(Map<String, String> contextMap) {this.lastOperation.set(1);Map<String, String> newMap = Collections.synchronizedMap(new HashMap());newMap.putAll(contextMap);this.copyOnThreadLocal.set(newMap);}
}

MessageStorage类


import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;/*** Message storage*/
public class MessageStorage {private static final InternalLogger log = ClientLogger.getLog();private static BasicMDC mdcUtils;static {mdcUtils = new BasicMDC();}/*** Determine whether there is a grayscale client** @param cidAll* @return*/public static boolean hasCanaryRelease(List<String> cidAll) {return !getCanaryCids(cidAll).isEmpty();}/*** Determine if all are grayscale clients** @param cidAll* @return*/public static boolean allCanaryRelease(List<String> cidAll) {List<String> canaryCids = getCanaryCids(cidAll);return canaryCids.size() == cidAll.size();}/*** Connect the message queue into a string** @param messageQueues* @return*/public static String joinMessageQueue(List<MessageQueue> messageQueues) {return messageQueues.stream().map(mq -> mq.getBrokerName() + ":" + mq.getQueueId()).collect(Collectors.joining(", "));}/*** Get the list of grayscale clients** @param cidAll* @return*/public static List<String> getCanaryCids(List<String> cidAll) {return cidAll.stream().filter(cid -> cid.contains(CustomCommonConstant.CANARY_RELEASE_PREFIX)).collect(Collectors.toList());}/*** Get a list of non grayscale clients** @param cidAll* @return*/public static List<String> getNormalCids(List<String> cidAll) {return cidAll.stream().filter(cid -> !cid.contains(CustomCommonConstant.CANARY_RELEASE_PREFIX)).collect(Collectors.toList());}/*** Get the list of grayscale queues** @param mqAll* @return*/public static List<MessageQueue> getCanaryQueues(List<MessageQueue> mqAll) {Collections.sort(mqAll);log.info("topic:{} reBalance, has canary release client, allocate {} message queue by canary release strategy.\n", JSON.toJSONString(mqAll));int size = mqAll.size();if (size > CustomCommonConstant.GRAYSCALE_QUEUE_SIZE) {List<MessageQueue> lastTwo = mqAll.subList(size - CustomCommonConstant.GRAYSCALE_QUEUE_SIZE, size);return new ArrayList<>(lastTwo);} else {return new ArrayList<>();}}/*** Get non grayscale queue list** @param mqAll* @return*/public static List<MessageQueue> getNormalQueues(List<MessageQueue> mqAll) {Collections.sort(mqAll);log.info("topic:{} reBalance, has normal release client, allocate {} message queue by canary release strategy.\n", JSON.toJSONString(mqAll));int size = mqAll.size();if (size > CustomCommonConstant.GRAYSCALE_QUEUE_SIZE) {List<MessageQueue> lastTwo = mqAll.subList(0, size - 2);return new ArrayList<>(lastTwo);} else {return new ArrayList<>(mqAll);}}private static volatile String canaryRelease = null;/*** Determine whether it is a grayscale client** @return*/public static boolean isCanaryRelease() {return Boolean.parseBoolean(getCanaryRelease());}/*** Determine whether it is a grayscale client** @return boolean*/public static boolean isThreadCanaryRelease() {String data = mdcUtils.get(CustomCommonConstant.THREAD_CANARY_RELEASE);return Boolean.parseBoolean(data);}/*** Set MDC** @param key* @param value*/public static void setMDC(String key, String value) {mdcUtils.put(key, value);}/*** @param key* @return {@link String }*/public static String getMdcKey(String key) {return mdcUtils.get(key);}/*** Clear MDC*/public static void clearMDC() {mdcUtils.clear();}/*** Replacement strategy** @return {@link AllocateMessageQueueStrategy }*/public static AllocateMessageQueueStrategy getAllocateMessageQueueAveragely(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
//        if (isCanaryRelease()) {
//            return new CanaryAllocateMessageQueueStrategyImpl();
//        } else {
//            return allocateMessageQueueStrategy;
//        }return new CanaryAllocateMessageQueueStrategyImpl();}private static String getCanaryRelease() {if (Objects.isNull(canaryRelease)) {synchronized (MessageStorage.class) {if (Objects.nonNull(canaryRelease)) {return canaryRelease;}//CANARY_RELEASEString tmpCanaryRelease = System.getProperty("canary.release");if (Objects.isNull(tmpCanaryRelease)) {tmpCanaryRelease = "false";}canaryRelease = tmpCanaryRelease;return canaryRelease;}}return canaryRelease;}
}

拦截配置

消费者拦截
@Component
@Aspect
public class ConsumerAOP {@Pointcut("execution(public * org.apache.rocketmq.client.consumer.listener.*.*(..))")public void aspectConsumer() {}@Before("aspectConsumer()")public void doBefore(JoinPoint joinPoint) {Object[] args = joinPoint.getArgs();for (Object arg : args) {if (arg instanceof List<?>) {List<MessageExt> messageExtList = (List<MessageExt>) arg;for (MessageExt messageExt : messageExtList) {String threadCanaryRelease = messageExt.getProperty(CustomCommonConstant.THREAD_CANARY_RELEASE);if(StringUtils.isEmpty(threadCanaryRelease)){threadCanaryRelease = "false";}MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);}}}}
}
@Component
@Aspect
public class ListenerAOP {@Pointcut("execution(public * org.apache.rocketmq.spring.core.*.*(..))")public void aspectListener(){}@Before("aspectListener()")public void doBefore(JoinPoint joinPoint){Object[] args = joinPoint.getArgs();for (Object arg : args) {if (arg instanceof MessageExt){MessageExt messageExt = (MessageExt) arg;String threadCanaryRelease = messageExt.getProperty(CustomCommonConstant.THREAD_CANARY_RELEASE);if(StringUtils.isEmpty(threadCanaryRelease)){threadCanaryRelease = "false";}MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);}}}
}
生产者拦截
@Component
@Aspect
public class ProducerAOP {@Pointcut("execution(public * org.apache.rocketmq.client.producer.*.*(..))")public void aspectProducer() {}@Before("aspectProducer()")public void doBefore(JoinPoint joinPoint) {Object[] args = joinPoint.getArgs();for (Object arg : args) {if (arg instanceof Message) {Message message = (Message) arg;String threadCanaryRelease = MessageStorage.getMdcKey(CustomCommonConstant.THREAD_CANARY_RELEASE);if(StringUtils.isEmpty(threadCanaryRelease)){threadCanaryRelease = "false";}message.putUserProperty(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);}}}
}
请求拦截
@Slf4j
@Component
@WebFilter(urlPatterns = "/*", filterName = "requestFilter")
@Order(Integer.MIN_VALUE)
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ForwardFilter implements Filter {@SneakyThrows@Overridepublic void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) {HttpServletRequest req = (HttpServletRequest) servletRequest;String threadCanaryRelease = req.getHeader("THREAD_CANARY_RELEASE");log.info("attributeNames:{}", JSON.toJSONString(req.getHeaderNames()));if(Objects.nonNull(threadCanaryRelease)){MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, "true");}filterChain.doFilter(servletRequest, servletResponse);}
}

验证过程

消费者(未启动灰度订阅)

在这里插入图片描述

消费者(灰度订阅)

在这里插入图片描述

验证消息链路

发送端
@Slf4j
@RestController
@RequestMapping("/producer")
public class ProducerMessageContrlller {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@PostMapping("/send")public void send(){boolean canaryRelease = MessageStorage.isCanaryRelease();String body = String.format("发送消息,环境:%s", canaryRelease ? "灰度" : "正式");
//        for (Integer i = 0; i < 100; i++) {log.info("发送消息,环境:{}", canaryRelease ? "灰度" : "正式");rocketMQTemplate.convertAndSend("TEST-DATA-MSG", body);
//        }}
}
消费端
@Slf4j
@Service
@RocketMQMessageListener(topic = "TEST-DATA-MSG", consumerGroup = "test-consumer-group")
public class MyConsumer1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {boolean canaryRelease = MessageStorage.isCanaryRelease();Map<String, String> properties = message.getProperties();log.info("received message: 【{}】 环境:【{}】 配置参数:【{}】 ", new String(message.getBody()),canaryRelease ? "灰度" : "正式",JSON.toJSONString(properties));}
}
发送灰度消息

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

相关文章:

SpringCloud 微服务消息队列灰度方案 (RocketMQ 4.x)

目录 背景遇到的问题 RocketMQ 基础基础消息模型扩展后的消息模型部署模型相关概念点 方案对比影子Topic的方案Tag的方案UserProperty的方案影子Group的方案灰度分区的方案方案对比 灰度分区方案设计适配只有部分灰度的情况所做的功能扩展消费者&#xff08;无灰度&#xff09;…...

厘清标准差和标准误:因果推断的统计学基础

标准差&#xff0c;指 一次抽样中 个体取值间的离散程度&#xff0c;反映了 个体取值对样本均值的代表性。 标准误&#xff0c;指 多次抽样中 样本均值间的离散程度&#xff0c;反映了 样本均值对总体均值的代表性。 公众号原文-厘清标准差和标准误&#xff1a;因果推断的统计…...

GESP4级考试语法知识(贪心算法(二))

排队接水2代码&#xff1a; #include<iostream> #include<cstdio> #include<algorithm> using namespace std; struct people {int num;int time; }; people s[1001]; int n,r,a[1001]; double sum,ave; bool cmp(people x,people y) {return x.time<y.t…...

MATLAB 使用教程 —— 命令窗口输入命令,工作区显示变量

命令在命令窗口输入变量在工作区显示 MATLAB 桌面包含的面板如下&#xff1a; 当前文件夹 - 此面板允许访问项目文件夹和文件。命令窗口 - 这是主要区域&#xff0c;用户在命令行中输入命令&#xff0c;命令提示符(>>).工作区 - 工作区显示所有变量&#xff0c;无论是创…...

LeetCode 热题100(八)【二叉树】(3)

目录 8.11二叉树展开为链表&#xff08;中等&#xff09; 8.12从前序与中序遍历序列构造二叉树&#xff08;中等&#xff09; 8.13路径总和III&#xff08;中等&#xff09; 8.14二叉树的最近公共祖先&#xff08;中等&#xff09; 8.15二叉树中的最大路径和&#xff08;困…...

uniapp h5实现录音

使用npm安装 npm install recorder-core引入Recorder库 可以使用import、require、html script等你适合的方式来引入js文件&#xff0c;下面的以import为主要参考&#xff0c;其他引入方式根据文件路径自行调整一下就可以了。 //必须引入的Recorder核心&#xff08;文件路径是…...

字节跳动Android面试题汇总及参考答案(80+面试题,持续更新)

Android 四大组件是什么? Android 四大组件分别是 Activity、Service、Broadcast Receiver 和 Content Provider。 Activity 是 Android 应用中最基本的组件,用于实现用户界面。它可以包含各种视图控件,如按钮、文本框等。一个 Activity 通常对应一个屏幕的内容。用户可以通…...

【go从零单排】通道select、通道timeout、Non-Blocking Channel Operations非阻塞通道操作

&#x1f308;Don’t worry , just coding! 内耗与overthinking只会削弱你的精力&#xff0c;虚度你的光阴&#xff0c;每天迈出一小步&#xff0c;回头时发现已经走了很远。 &#x1f4d7;概念 select 语句是 Go 的一种控制结构&#xff0c;用于等待多个通道操作。它类似于 s…...

PSRR仿真笔记

1.首先打开bandgap的testbench电路&#xff0c;选择schematic 2.打开电路后,选择VDD模块&#xff0c;然后按键盘Q&#xff0c;进行编辑&#xff0c;将AC magnitude改为1 V 3.修改完成后&#xff0c;点击左上角Launch > ADE Explorer 4.在出现的窗口中&#xff0c;选择Creat…...

AUTOSAR_EXP_ARAComAPI的7章笔记(3)

☞返回总目录 相关总结&#xff1a;AutoSar AP简单多绑定总结 7.3 多绑定 如在 5.4.3 小节中简要讨论的&#xff0c;某个代理类 / 骨架类的不同实例之间的技术传输是不同的&#xff0c;多绑定描述了这种情况的解决方案。多种技术原因都可能导致这种情况出现&#xff1a; 代…...

WSADATA 关键字详细介绍

WSADATA 是 Windows Sockets API&#xff08;Winsock&#xff09;中用于存储 Winsock 库的初始化信息的结构体。在使用 Winsock API 之前&#xff0c;必须通过调用 WSAStartup() 函数进行初始化&#xff0c;WSADATA 结构体用于接收有关 Winsock 库版本的信息。Winsock 是 Windo…...

Day44 | 动态规划 :状态机DP 买卖股票的最佳时机IV买卖股票的最佳时机III

Day44 | 动态规划 &#xff1a;状态机DP 买卖股票的最佳时机IV&&买卖股票的最佳时机III&&309.买卖股票的最佳时机含冷冻期 动态规划应该如何学习&#xff1f;-CSDN博客 本次题解参考自灵神的做法&#xff0c;大家也多多支持灵神的题解 买卖股票的最佳时机【…...

Area-Composition模型部署指南

一、介绍 本模型可以通过输入不同的提示词&#xff0c;然后根据各部分提示词进行融合生成图片。如下图&#xff1a; 此图像包含 4 个不同的区域&#xff1a;夜晚、傍晚、白天、早晨 二、部署 环境要求&#xff1a; 最低显存&#xff1a;10G 1. 部署ComfyUI 本篇的模型部署…...

策略模式、状态机详细解读

策略模式 (Strategy Pattern) 策略模式 (Strategy Pattern) 是一种行为型设计模式&#xff0c;旨在将一组算法封装成独立的类&#xff0c;使得它们可以相互替换。这种模式让算法的变化不会影响到使用算法的客户&#xff0c;减少了类之间的耦合。策略模式通常用于处理一类问题&…...

OpenWrt广播DNS到客户端

OpenWrt广播DNS到客户端 Network -> Interfaces -> lan ->DHCP Server -> Advanced Settings -> DHCP-Options 设置 6,dns1,dns2 如下图 也可以直接编辑 /etc/config/dhcp config dhcp lan list dhcp_option 6,119.29.29.29,223.5.5.5 #ipv4 option dns 2402:4…...

C++编程技巧与规范-类和对象

类和对象 1. 静态对象的探讨与全局对象的构造顺序 静态对象的探讨 类中的静态成员变量(类类型静态成员) 类中静态变量的声明与定义&#xff08;类中声明类外定义&#xff09; #include<iostream> using namespace std;namespace _nmspl {class A{public:A():m_i(5){…...

AutoHotKey自动热键AHK-正则表达式

在这个软件的操作中,基本都是需要即时的解决一些问题,所以对字符串的操作是比较多的,所以正则的使用还是比较重要的,接下来我们用一个例子来了解正则表达式的使用 str "7654321" RegExMatch(str, "65(43)(21)", SubPat)str ( str %str% SubPat %SubPa…...

【3D Slicer】的小白入门使用指南四

开源解剖影像浏览工具Open Anatomy Browser使用及介绍 和3D slicer米有太大关系,该工具是网页版影像数据的浏览工具(可以简单理解为网页版的3D slicer) 介绍 ● 开放解剖(OA)浏览器是由神经影像分析中心开发的,基于网络浏览器技术构建的图谱查看器。 ● OA浏览器将解剖模…...

flink同步mysql数据表到pg库

1.关闭防火墙和selinux systemctl stop firewalld systemctl disable firewalld systemctl status firewalldvi /etc/selinux/config 修改为disabled2.安装java8 yum list java-1.8* yum install java-1.8.0-openjdk* -yjava -version3.下载和部署postgresql 下载地址&#…...

AndroidStudio-常用布局

一、线性布局LinearLayout 线性布局内部的各视图有两种排列方式: 1.orientation属性值为horizontal时&#xff0c;内部视图在水平方向从左往右排列。 2.orientation属性值为vertical时&#xff0c;内部视图在垂直方向从上往下排列。 如果不指定orientation属性&#xff0c;…...

Vue全栈开发旅游网项目(10)-用户管理后端接口开发

1.异步用户登录\登出接口开发 1.设计公共响应数据类型 文件地址&#xff1a;utils/response404.py from django.http import JsonResponseclass BadRequestJsonResponse(JsonResponse):status_code 400def __init__(self, err_list, *args, **kwargs):data {"error_c…...

[Android]查找java类中声明为native方法的具体实现方法

在android代码中&#xff0c;经常可以看到native方法&#xff0c;需要查看其对应的C方法&#xff0c;这些方法是一一对应的&#xff0c;对应关系是在jni注册里关联起来的。 比较直观的是这样的例子&#xff0c; Parcel.java //Java层的方法里调用了native方法nativeWriteInt…...

Exploring Defeasible Reasoning in Large Language Models: A Chain-of-Thought A

文章目录 题目摘要简介准备工作数据集生成方法实验结论 题目 探索大型语言模型中的可废止推理&#xff1a;思路链 论文地址&#xff1a;http://collegepublications.co.uk/downloads/LNGAI00004.pdf#page136 摘要 许多大型语言模型 (LLM) 经过大量高质量数据语料库的训练&…...

uniapp在app模式下组件传值

在 UniApp 编译成 App 后&#xff0c;传递参数可以通过多种方式实现&#xff0c;常见的方式有以下几种&#xff1a; 1. 通过 URL 参数传递&#xff08;适用于 WebView&#xff09; 如果你的 App 页面通过 WebView 渲染&#xff0c;可以像在 Web 端一样通过 URL 传递参数。例如…...

Docker解决暴露2375端口引发的安全漏洞

docker的暴露api端口2375&#xff0c;没有任何安全防护&#xff0c;我们通过linux系统防火墙&#xff08;iptables&#xff09;来进行ip访问限制 # 查看iptables所有规则 iptables -L -nv # 只允许某个ip访问2375端口 iptables -I INPUT -s 127.0.0.1 -p tcp --dport 2375 -j A…...

HTML5+CSS前端开发【保姆级教学】+新闻文章初体验

Hello&#xff0c;各位编程猿们&#xff01;上一篇文章介绍了前端以及软件的安装&#xff0c;这一篇我们要继续讲解页面更多知识点&#xff0c;教大家做一篇新闻题材的文章 新闻文章 当我们点开浏览器经常看到各种各样的文章&#xff0c;今天我们就来看看大家最喜欢关注的体育…...

『VUE』26. props实现子组件传递数据给父组件(详细图文注释)

目录 本节内容示例代码总结 欢迎关注 『VUE』 专栏&#xff0c;持续更新中 欢迎关注 『VUE』 专栏&#xff0c;持续更新中 本节内容 父组件传子组件–props 子组件传父组件–自定义事件 本节讲子组件传父组件–通过props里的方法传递,就是父亲写了一个函数,给子组件调用,然后…...

RHCE-DNS域名解析服务器

一、DNS简介 DNS &#xff08; Domain Name System &#xff09;是互联网上的一项服务&#xff0c;它作为将域名和 IP 地址相互映射的一个分布式 数据库&#xff0c;能够使人更方便的访问互联网。 DNS 系统使用的是网络的查询&#xff0c;那么自然需要有监听的 port 。 DNS 使…...

移民统计年鉴(1996-2021年)

年鉴中包含了以下几个方面的数据&#xff1a; 移民数量&#xff1a;记录了每年全球移民的总数&#xff0c;以及不同国家和地区的移民流入和流出情况。 移民类型&#xff1a;区分了经济移民、难民、家庭团聚等不同类型的移民。 移民原因&#xff1a;分析了推动移民的各种因素&…...

MFC1(note)

引言 在学习SDK后我们发现&#xff0c;写消息好麻烦&#xff0c;处理消息更麻烦 处理消息效率低发送消息效率低 所以把SDK中这些消息全部封装好 MFC封装了windows 的大部分API 这里说一下QT架构跨平台 MFC用得如何取决于你SDK的水平 创建 如果打开没有MFC 一般勾选以下…...