缓存框架JetCache源码解析-缓存变更通知机制
为什么需要缓存变更通知机制?如果我们使用的是本地缓存或者多级缓存(本地缓存+远程缓存),当其中一个节点的本地缓存变更之后,为了保证缓存尽量的一致性,此时其他节点的本地缓存也需要去变更,这时候常用的做法就是需要接入一个通知机制,只要有节点的本地缓存变更了,那么这时候就广播出一个缓存变更消息,其他节点会去订阅这个消息,最终消费这个消息的时候把本地缓存的数据进行更新或删除。当然,通知机制的实现有很多,比如可以利用redis的发布订阅机制,也可以接入mq这种消息中间件作为消息通知,不过相比之下,mq就稍微重了点,所以JetCache则是使用redis的发布订阅机制去实现缓存变更通知机制
缓存监视器CacheMonitor
@FunctionalInterface
public interface CacheMonitor {void afterOperation(CacheEvent event);}
如果我们想要监控缓存的每一次操作,或者在缓存的每一次操作的时候进行一些埋点扩展,那么JetCache提供了CacheMonitor接口可以让我们用户自己去进行一些自定义的实现,当然在JetCache内部也有默认的一些缓存监视器的实现:
- DefaultCacheMonitor
该monitor主要是监控缓存的每一次操作,比如get,put,remove等,基于监控这些操作之下就可以采集到各种操作指标,例如缓存命中次数,获取缓存次数,加载数据次数等等,详细的采集的指标如下:
protected String cacheName;
protected long statStartTime;
protected long statEndTime;protected long getCount;
protected long getHitCount;
protected long getMissCount;
protected long getFailCount;
protected long getExpireCount;
protected long getTimeSum;
protected long minGetTime = Long.MAX_VALUE;
protected long maxGetTime = 0;protected long putCount;
protected long putSuccessCount;
protected long putFailCount;
protected long putTimeSum;
protected long minPutTime = Long.MAX_VALUE;
protected long maxPutTime = 0;protected long removeCount;
protected long removeSuccessCount;
protected long removeFailCount;
protected long removeTimeSum;
protected long minRemoveTime = Long.MAX_VALUE;
protected long maxRemoveTime = 0;protected long loadCount;
protected long loadSuccessCount;
protected long loadFailCount;
protected long loadTimeSum;
protected long minLoadTime = Long.MAX_VALUE;
protected long maxLoadTime = 0;
- CacheNotifyMonitor
该monitor看名字大概就知道它的作用是用来监控缓存变更之后发送通知的,监控的缓存操作有4种,分别是put,putAll,remove,removeAll,这4个操作都是会让缓存产生变更的操作,代码如下:
@Override
public void afterOperation(CacheEvent event) {if (this.broadcastManager == null) {return;}AbstractCache absCache = CacheUtil.getAbstractCache(event.getCache());if (absCache.isClosed()) {return;}AbstractEmbeddedCache localCache = getLocalCache(absCache);if (localCache == null) {return;}// put事件if (event instanceof CachePutEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CachePutEvent e = (CachePutEvent) event;m.setType(CacheMessage.TYPE_PUT);m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});broadcastManager.publish(m);}// remove事件else if (event instanceof CacheRemoveEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CacheRemoveEvent e = (CacheRemoveEvent) event;m.setType(CacheMessage.TYPE_REMOVE);m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});broadcastManager.publish(m);}// putAll事件else if (event instanceof CachePutAllEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CachePutAllEvent e = (CachePutAllEvent) event;m.setType(CacheMessage.TYPE_PUT_ALL);if (e.getMap() != null) {m.setKeys(e.getMap().keySet().stream().map(k -> convertKey(k, localCache)).toArray());}broadcastManager.publish(m);}// removeAll事件else if (event instanceof CacheRemoveAllEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;m.setType(CacheMessage.TYPE_REMOVE_ALL);if (e.getKeys() != null) {m.setKeys(e.getKeys().stream().map(k -> convertKey(k, localCache)).toArray());}broadcastManager.publish(m);}
}
缓存变更消息发布器/订阅器BroadcastManager
上面我们知道了JetCache中会通过CacheNotifyMonitor来监控缓存的变更,但是发送缓存变更的消息通知则是交给BroadcastManager去做的。BroadcastManager是一个抽象类,提供了两个抽象方法:
/*** 发布缓存变更消息* @param cacheMessage cacheMessage*/
public abstract CacheResult publish(CacheMessage cacheMessage);/*** 订阅缓存变更消息*/
public abstract void startSubscribe();
publish方法是发布广播缓存变更消息的方法,传入的CacheMessage参数就是缓存变更消息,而startSubscribe方法则是订阅缓存变更消息的,这两个方法具体由子类进行实现,目前JetCache中主要提供了4种实现,这4种实现都是基于redis的订阅发布机制实现的,只是使用的redis客户端不一样的区别
- LettuceBroadcastManager
- RedisBroadcastManager
- RedissonBroadcastManager
- SpringDataBroadcastManager
我们这里可以以RedissonBroadcastManager为例,去看它是怎样实现缓存变更消息通知机制的
public class RedissonBroadcastManager extends BroadcastManager {private static final Logger logger = LoggerFactory.getLogger(RedissonBroadcastManager.class);private final RedissonCacheConfig<?, ?> config;private final String channel;private final RedissonClient client;private volatile int subscribeId;private final ReentrantLock reentrantLock = new ReentrantLock();public RedissonBroadcastManager(final CacheManager cacheManager, final RedissonCacheConfig<?, ?> config) {super(cacheManager);checkConfig(config);this.config = config;this.channel = config.getBroadcastChannel();this.client = config.getRedissonClient();}@Overridepublic void startSubscribe() {reentrantLock.lock();try {if (this.subscribeId == 0 && Objects.nonNull(this.channel) && !this.channel.isEmpty()) {this.subscribeId = this.client.getTopic(this.channel).addListener(byte[].class, (channel, msg) -> processNotification(msg, this.config.getValueDecoder()));}}finally {reentrantLock.unlock();}}@Overridepublic void close() {reentrantLock.lock();try {final int id;if ((id = this.subscribeId) > 0 && Objects.nonNull(this.channel)) {this.subscribeId = 0;try {this.client.getTopic(this.channel).removeListener(id);} catch (Throwable e) {logger.warn("unsubscribe {} fail", this.channel, e);}}}finally {reentrantLock.unlock();}}@Overridepublic CacheResult publish(final CacheMessage cacheMessage) {try {if (Objects.nonNull(this.channel) && Objects.nonNull(cacheMessage)) {final byte[] msg = this.config.getValueEncoder().apply(cacheMessage);this.client.getTopic(this.channel).publish(msg);return CacheResult.SUCCESS_WITHOUT_MSG;}return CacheResult.FAIL_WITHOUT_MSG;} catch (Throwable e) {SquashedLogger.getLogger(logger).error("jetcache publish error", e);return new CacheResult(e);}}
}
首先当调用Cache实例的put,putAll,remove,removeAll这些方法的时候,就会去触CacheNotifyMonitor,CacheNotifyMonitor这时候就会去创建一个缓存变更信息然后交给RedissonBroadcastManager去调用publish方法,在publish方法就会使用redisson的发布api去广播出这条缓存变更消息,当然订阅也是使用redisson的订阅api,当监听到有缓存变更消息的时候,这时候就会去回调processNotification方法,该方法是父类BroadcastManager的方法:
protected void processNotification(byte[] message, Function<byte[], Object> decoder) {try {if (message == null) {logger.error("notify message is null");return;}Object value = decoder.apply(message);if (value == null) {logger.error("notify message is null");return;}if (value instanceof CacheMessage) {processCacheMessage((CacheMessage) value);} else {logger.error("the message is not instance of CacheMessage, class={}", value.getClass());}} catch (Throwable e) {SquashedLogger.getLogger(logger).error("receive cache notify error", e);}
}private void processCacheMessage(CacheMessage cacheMessage) {// 条件成立:说明这个缓存消息是自己发送的,这时候不用处理if (sourceId.equals(cacheMessage.getSourceId())) {return;}// 根据area和cacheName从缓存管理器中获取到对应的Cache实例Cache cache = cacheManager.getCache(cacheMessage.getArea(), cacheMessage.getCacheName());if (cache == null) {logger.warn("Cache instance not exists: {},{}", cacheMessage.getArea(), cacheMessage.getCacheName());return;}// 如果这个Cache实例不是一个多级缓存的Cache实例,那么就直接return,因为这里主要是针对多级缓存的情况下进行处理的Cache absCache = CacheUtil.getAbstractCache(cache);if (!(absCache instanceof MultiLevelCache)) {logger.warn("Cache instance is not MultiLevelCache: {},{}", cacheMessage.getArea(), cacheMessage.getCacheName());return;}// 获取到多级缓存中所有的Cache实例Cache[] caches = ((MultiLevelCache) absCache).caches();// 获取到缓存有变动的keySet<Object> keys = Stream.of(cacheMessage.getKeys()).collect(Collectors.toSet());// 遍历所有的Cache实例for (Cache c : caches) {Cache localCache = CacheUtil.getAbstractCache(c);// 只针对本地缓存的Cache实例,把缓存有变动的key从本地缓存中移除if (localCache instanceof AbstractEmbeddedCache) {((AbstractEmbeddedCache) localCache).__removeAll(keys);} else {break;}}
}
最终会调用到processCacheMessage方法,需要注意的是如果是自己发送的消息则不需要再去消费,所以会使用一个sourceId去进行过滤,并且还会去判断当前的Cache实例是否是一个多级缓存实例,如果是多级缓存,那么就把这个多级缓存种的所有本地缓存都取出来,然后最后根据缓存变更消息里面的key把对应的本地缓存都remove掉,可能有人问为什么这里对本地缓存进行remove而不是update呢?因为如果使用update的话,还需要考虑并发的场景,比如这个key进行了两次更新,先后会发布两次缓存变更消息,但是订阅者去消费这两个消息的时候并不能去保证它们的先后顺序,此时就会有可能造成更新顺序的问题,但是使用remove就可能完全避免这种并发问题了
总结
在JetCache中提供了对于缓存操作的后置扩展接口,也叫做缓存监控器,我们可以自己去实现自己的缓存监控器来对缓存操作之后做一些自定义的功能,默认地JetCache提供了两个缓存监视器的实现,一个是采集缓存操作的一些指标信息DefaultCacheMonitor,一个是在缓存变更之后发送缓存变更通知的CacheNotifyMonitor,其中CacheNotifyMonitor在监控到缓存变更之后会去使用BroadcastManager去进行消息的发布订阅,BroadcastManager是发布者也是订阅者,在订阅消费的时会根据缓存变更消息里面的keys去把当前Cache实例的本地缓存进行remove,也就是依赖于这样的一个缓存变更消息通知机制,就可以保证当使用多级缓存的时候,多节点间的本地缓存尽量达成一致
相关文章:
缓存框架JetCache源码解析-缓存变更通知机制
为什么需要缓存变更通知机制?如果我们使用的是本地缓存或者多级缓存(本地缓存远程缓存),当其中一个节点的本地缓存变更之后,为了保证缓存尽量的一致性,此时其他节点的本地缓存也需要去变更,这时…...
Android 设置特定Activity内容顶部显示在状态栏底部,也就是状态栏的下层 以及封装一个方法修改状态栏颜色
推荐:https://github.com/gyf-dev/ImmersionBar 在 Android 中要实现特定 Activity 内容顶部显示在状态栏底部以及封装方法修改状态栏颜色,可以通过以下步骤来完成: 一、让 Activity 内容显示在状态栏底部 在 AndroidManifest.xml 文件中,为特…...

用自己的数据集复现YOLOv5
yolov5已经出了很多版本了,这里我以目前最新的版本为例,先在官网下载源码:GitHub - ultralytics/yolov5: YOLOv5 🚀 in PyTorch > ONNX > CoreML > TFLite 然后下载预训练模型,需要哪个就点击哪个模型就行&am…...

如何在博客中插入其他的博客链接(超简单)最新版
如何在博客中插入其他的博客链接 1.复制自己要添加的网址(组合键:Ctrlc)2. 点击超链接按钮3. 粘贴自己刚才复制的网址(组合键:Ctrlv)并点击确定即可4.让博客链接显示中文5.点击蓝字即可打开 1.复制自己要添…...

JS通过递归函数来剔除树结构特定节点
最近在处理权限类问题过程中,遇到多次需要过滤一下来列表的数据,针对不同用户看到的数据不同。记录一下 我的数据大致是这样的: class UserTree {constructor() {this.userTreeData [// 示例数据{ nodeid: "1", nodename: "R…...
javayufa
1.变量、运算符、表达式、输入输出 编写一个简单的Java程序–手速练习 public class Main { public static void main(String[] args) { System.out.println("Hello World"); } } 三、语法基础 变量 变量必须先定义,才可以使用。不能重名。 变量定义的方…...

软考-高级系统分析师知识点-补充篇
云计算 云计算的体系结构由5部分组成,分别为应用层,平台层,资源层,用户访问层和管理层,云计算的本质是通过网络提供服务,所以其体系结构以服务为核心。 系统的可靠性技术---容错技术---冗余技术 容错是指系…...

JavaScript全面指南(四)
🌈个人主页:前端青山 🔥系列专栏:JavaScript篇 🔖人终将被年少不可得之物困其一生 依旧青山,本期给大家带来JavaScript篇专栏内容:JavaScript全面指南 目录 61、如何防止XSRF攻击 62、如何判断一个对象是否为数组&…...
2024年诺贝尔物理学奖的创新之举
对于2024年诺贝尔物理学奖的这一创新之举,我的观点可以从以下几点展开: 跨学科融合的里程碑:将诺贝尔物理学奖颁发给机器学习与神经网络领域的研究者,标志着科学界对跨学科合作和融合的认可达到新高度。这不仅体现了理论物理与计算…...

FileLink内外网文件交换——致力企业高效安全文件共享
随着数字化转型的推进,企业之间的文件交流需求日益增加。然而,传统的文件传输方式往往无法满足速度和安全性的双重要求。FileLink作为一款专注于跨网文件交换的工具,致力于为企业提供高效、安全的文件共享解决方案。 应用场景一:项…...
使用Python在Jupyter Notebook中显示Markdown文本
使用Python在Jupyter Notebook中显示Markdown文本 引言1. 导入必要的模块2. 定义一个函数来显示Markdown文本3. 使用print_md函数显示Markdown文本4. 总结 引言 作为一名Python初级程序员,你可能已经熟悉了Jupyter Notebook这个强大的工具。Jupyter Notebook不仅支…...

G1 GAN生成MNIST手写数字图像
🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 G1 GAN生成MNIST手写数字图像 1. 生成对抗网络 (GAN) 简介 生成对抗网络 (GAN) 是一种通过“对抗性”学习生成数据的深度学习模型,通常用于生成…...

WPFDeveloper正式版发布
WPFDeveloper WPFDeveloper一个基于WPF自定义高级控件的WPF开发人员UI库,它提供了众多的自定义控件。 该项目的创建者和主要维护者是现役微软MVP 闫驚鏵: https://github.com/yanjinhuagood 该项目还有众多的维护者,详情可以访问github上的README&…...
实现鼠标经过某个元素时弹出提示框(通常称为“工具提示”或“悬浮提示”)
要实现鼠标经过某个元素时弹出提示框(通常称为“工具提示”或“悬浮提示”),你可以使用 JavaScript 结合 CSS 来创建这个效果。以下是详细步骤,包括 HTML、CSS 和 JavaScript 的代码示例。 HTML 结构 首先,创建一个简…...

【GAMES101笔记速查——Lecture 17 Materials and Appearances】
目录 1 材质和外观 1.1 自然界中,外观是光线和材质共同作用的结果 1.2 图形学中,什么是材质? 1.2.1 渲染方程严格正确,其中BRDF项决定了物体的材质 1.2.2 漫反射材质 (1)如何定义漫反射系数࿱…...

对于从vscode ssh到virtualBox的timeout记录
如题,解决方式如下: 1.把虚拟机关机退出来,在这个界面进行网络设置:选桥接网卡 2.然后再进系统,使用命令 ip addr查看如今的ip地址,应该和在本机里面看到的是一个网段 3.打开vscode,该干啥干…...

鸿蒙原生应用扬帆起航
就在2024年6月21日华为在开发者大会上发布了全新操作的系统HarmonyOS Next开发测试版,网友们把它称之为“称之为纯血鸿蒙”。因为在此之前鸿蒙系统底层式有两套基础架构的,一套是是Android的AOSP,一套是鸿蒙的Open Harmony,因为早…...
《计算机视觉》—— 表情识别
根据计算眼睛、嘴巴的变化,判断是什么表情结合以下两篇文章来理解表情识别的实现方法 基于 dilib 库的人脸检测 https://blog.csdn.net/weixin_73504499/article/details/142977202?spm1001.2014.3001.5501 基于 dlib 库的人脸关键点定位 https://blog.csdn.net/we…...

NVIDIA Aerial Omniverse
NVIDIA Aerial Omniverse 数字孪生助力打造新一代无线网络 文章目录 前言一、从链路级仿真到系统级仿真二、转变无线研发方式1. 开放且可定制的模块化平台2. 适用于 6G 标准化的 3GPP 兼容平台3. 部署前测试4. AI 和 ML 在数字孪生中的应用5. 高级物理精准的电磁求解器6. 合作伙…...
QT程序报错解决方案:Cannot queue arguments of type ‘QTextCharFormat‘ 或 ‘QTextCursor‘
项目场景: 项目场景:基于QT实现的C某程序,搭载在Linux环境中。 问题描述 执行程序时,发现log中报错如下内容: QObject::connect: Cannot queue arguments of type QTextCharFormat (Make sure QTextCharFormat is r…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...

国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...

简易版抽奖活动的设计技术方案
1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
蓝桥杯 2024 15届国赛 A组 儿童节快乐
P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡,轻快的音乐在耳边持续回荡,小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下,六一来了。 今天是六一儿童节,小蓝老师为了让大家在节…...
多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验
一、多模态商品数据接口的技术架构 (一)多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如,当用户上传一张“蓝色连衣裙”的图片时,接口可自动提取图像中的颜色(RGB值&…...
Frozen-Flask :将 Flask 应用“冻结”为静态文件
Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是:将一个 Flask Web 应用生成成纯静态 HTML 文件,从而可以部署到静态网站托管服务上,如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...