缓存框架JetCache源码解析-缓存变更通知机制
为什么需要缓存变更通知机制?如果我们使用的是本地缓存或者多级缓存(本地缓存+远程缓存),当其中一个节点的本地缓存变更之后,为了保证缓存尽量的一致性,此时其他节点的本地缓存也需要去变更,这时候常用的做法就是需要接入一个通知机制,只要有节点的本地缓存变更了,那么这时候就广播出一个缓存变更消息,其他节点会去订阅这个消息,最终消费这个消息的时候把本地缓存的数据进行更新或删除。当然,通知机制的实现有很多,比如可以利用redis的发布订阅机制,也可以接入mq这种消息中间件作为消息通知,不过相比之下,mq就稍微重了点,所以JetCache则是使用redis的发布订阅机制去实现缓存变更通知机制
缓存监视器CacheMonitor
@FunctionalInterface
public interface CacheMonitor {void afterOperation(CacheEvent event);}
如果我们想要监控缓存的每一次操作,或者在缓存的每一次操作的时候进行一些埋点扩展,那么JetCache提供了CacheMonitor接口可以让我们用户自己去进行一些自定义的实现,当然在JetCache内部也有默认的一些缓存监视器的实现:
- DefaultCacheMonitor
该monitor主要是监控缓存的每一次操作,比如get,put,remove等,基于监控这些操作之下就可以采集到各种操作指标,例如缓存命中次数,获取缓存次数,加载数据次数等等,详细的采集的指标如下:
protected String cacheName;
protected long statStartTime;
protected long statEndTime;protected long getCount;
protected long getHitCount;
protected long getMissCount;
protected long getFailCount;
protected long getExpireCount;
protected long getTimeSum;
protected long minGetTime = Long.MAX_VALUE;
protected long maxGetTime = 0;protected long putCount;
protected long putSuccessCount;
protected long putFailCount;
protected long putTimeSum;
protected long minPutTime = Long.MAX_VALUE;
protected long maxPutTime = 0;protected long removeCount;
protected long removeSuccessCount;
protected long removeFailCount;
protected long removeTimeSum;
protected long minRemoveTime = Long.MAX_VALUE;
protected long maxRemoveTime = 0;protected long loadCount;
protected long loadSuccessCount;
protected long loadFailCount;
protected long loadTimeSum;
protected long minLoadTime = Long.MAX_VALUE;
protected long maxLoadTime = 0;
- CacheNotifyMonitor
该monitor看名字大概就知道它的作用是用来监控缓存变更之后发送通知的,监控的缓存操作有4种,分别是put,putAll,remove,removeAll,这4个操作都是会让缓存产生变更的操作,代码如下:
@Override
public void afterOperation(CacheEvent event) {if (this.broadcastManager == null) {return;}AbstractCache absCache = CacheUtil.getAbstractCache(event.getCache());if (absCache.isClosed()) {return;}AbstractEmbeddedCache localCache = getLocalCache(absCache);if (localCache == null) {return;}// put事件if (event instanceof CachePutEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CachePutEvent e = (CachePutEvent) event;m.setType(CacheMessage.TYPE_PUT);m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});broadcastManager.publish(m);}// remove事件else if (event instanceof CacheRemoveEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CacheRemoveEvent e = (CacheRemoveEvent) event;m.setType(CacheMessage.TYPE_REMOVE);m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});broadcastManager.publish(m);}// putAll事件else if (event instanceof CachePutAllEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CachePutAllEvent e = (CachePutAllEvent) event;m.setType(CacheMessage.TYPE_PUT_ALL);if (e.getMap() != null) {m.setKeys(e.getMap().keySet().stream().map(k -> convertKey(k, localCache)).toArray());}broadcastManager.publish(m);}// removeAll事件else if (event instanceof CacheRemoveAllEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;m.setType(CacheMessage.TYPE_REMOVE_ALL);if (e.getKeys() != null) {m.setKeys(e.getKeys().stream().map(k -> convertKey(k, localCache)).toArray());}broadcastManager.publish(m);}
}
缓存变更消息发布器/订阅器BroadcastManager
上面我们知道了JetCache中会通过CacheNotifyMonitor来监控缓存的变更,但是发送缓存变更的消息通知则是交给BroadcastManager去做的。BroadcastManager是一个抽象类,提供了两个抽象方法:
/*** 发布缓存变更消息* @param cacheMessage cacheMessage*/
public abstract CacheResult publish(CacheMessage cacheMessage);/*** 订阅缓存变更消息*/
public abstract void startSubscribe();
publish方法是发布广播缓存变更消息的方法,传入的CacheMessage参数就是缓存变更消息,而startSubscribe方法则是订阅缓存变更消息的,这两个方法具体由子类进行实现,目前JetCache中主要提供了4种实现,这4种实现都是基于redis的订阅发布机制实现的,只是使用的redis客户端不一样的区别
- LettuceBroadcastManager
- RedisBroadcastManager
- RedissonBroadcastManager
- SpringDataBroadcastManager
我们这里可以以RedissonBroadcastManager为例,去看它是怎样实现缓存变更消息通知机制的
public class RedissonBroadcastManager extends BroadcastManager {private static final Logger logger = LoggerFactory.getLogger(RedissonBroadcastManager.class);private final RedissonCacheConfig<?, ?> config;private final String channel;private final RedissonClient client;private volatile int subscribeId;private final ReentrantLock reentrantLock = new ReentrantLock();public RedissonBroadcastManager(final CacheManager cacheManager, final RedissonCacheConfig<?, ?> config) {super(cacheManager);checkConfig(config);this.config = config;this.channel = config.getBroadcastChannel();this.client = config.getRedissonClient();}@Overridepublic void startSubscribe() {reentrantLock.lock();try {if (this.subscribeId == 0 && Objects.nonNull(this.channel) && !this.channel.isEmpty()) {this.subscribeId = this.client.getTopic(this.channel).addListener(byte[].class, (channel, msg) -> processNotification(msg, this.config.getValueDecoder()));}}finally {reentrantLock.unlock();}}@Overridepublic void close() {reentrantLock.lock();try {final int id;if ((id = this.subscribeId) > 0 && Objects.nonNull(this.channel)) {this.subscribeId = 0;try {this.client.getTopic(this.channel).removeListener(id);} catch (Throwable e) {logger.warn("unsubscribe {} fail", this.channel, e);}}}finally {reentrantLock.unlock();}}@Overridepublic CacheResult publish(final CacheMessage cacheMessage) {try {if (Objects.nonNull(this.channel) && Objects.nonNull(cacheMessage)) {final byte[] msg = this.config.getValueEncoder().apply(cacheMessage);this.client.getTopic(this.channel).publish(msg);return CacheResult.SUCCESS_WITHOUT_MSG;}return CacheResult.FAIL_WITHOUT_MSG;} catch (Throwable e) {SquashedLogger.getLogger(logger).error("jetcache publish error", e);return new CacheResult(e);}}
}
首先当调用Cache实例的put,putAll,remove,removeAll这些方法的时候,就会去触CacheNotifyMonitor,CacheNotifyMonitor这时候就会去创建一个缓存变更信息然后交给RedissonBroadcastManager去调用publish方法,在publish方法就会使用redisson的发布api去广播出这条缓存变更消息,当然订阅也是使用redisson的订阅api,当监听到有缓存变更消息的时候,这时候就会去回调processNotification方法,该方法是父类BroadcastManager的方法:
protected void processNotification(byte[] message, Function<byte[], Object> decoder) {try {if (message == null) {logger.error("notify message is null");return;}Object value = decoder.apply(message);if (value == null) {logger.error("notify message is null");return;}if (value instanceof CacheMessage) {processCacheMessage((CacheMessage) value);} else {logger.error("the message is not instance of CacheMessage, class={}", value.getClass());}} catch (Throwable e) {SquashedLogger.getLogger(logger).error("receive cache notify error", e);}
}private void processCacheMessage(CacheMessage cacheMessage) {// 条件成立:说明这个缓存消息是自己发送的,这时候不用处理if (sourceId.equals(cacheMessage.getSourceId())) {return;}// 根据area和cacheName从缓存管理器中获取到对应的Cache实例Cache cache = cacheManager.getCache(cacheMessage.getArea(), cacheMessage.getCacheName());if (cache == null) {logger.warn("Cache instance not exists: {},{}", cacheMessage.getArea(), cacheMessage.getCacheName());return;}// 如果这个Cache实例不是一个多级缓存的Cache实例,那么就直接return,因为这里主要是针对多级缓存的情况下进行处理的Cache absCache = CacheUtil.getAbstractCache(cache);if (!(absCache instanceof MultiLevelCache)) {logger.warn("Cache instance is not MultiLevelCache: {},{}", cacheMessage.getArea(), cacheMessage.getCacheName());return;}// 获取到多级缓存中所有的Cache实例Cache[] caches = ((MultiLevelCache) absCache).caches();// 获取到缓存有变动的keySet<Object> keys = Stream.of(cacheMessage.getKeys()).collect(Collectors.toSet());// 遍历所有的Cache实例for (Cache c : caches) {Cache localCache = CacheUtil.getAbstractCache(c);// 只针对本地缓存的Cache实例,把缓存有变动的key从本地缓存中移除if (localCache instanceof AbstractEmbeddedCache) {((AbstractEmbeddedCache) localCache).__removeAll(keys);} else {break;}}
}
最终会调用到processCacheMessage方法,需要注意的是如果是自己发送的消息则不需要再去消费,所以会使用一个sourceId去进行过滤,并且还会去判断当前的Cache实例是否是一个多级缓存实例,如果是多级缓存,那么就把这个多级缓存种的所有本地缓存都取出来,然后最后根据缓存变更消息里面的key把对应的本地缓存都remove掉,可能有人问为什么这里对本地缓存进行remove而不是update呢?因为如果使用update的话,还需要考虑并发的场景,比如这个key进行了两次更新,先后会发布两次缓存变更消息,但是订阅者去消费这两个消息的时候并不能去保证它们的先后顺序,此时就会有可能造成更新顺序的问题,但是使用remove就可能完全避免这种并发问题了
总结
在JetCache中提供了对于缓存操作的后置扩展接口,也叫做缓存监控器,我们可以自己去实现自己的缓存监控器来对缓存操作之后做一些自定义的功能,默认地JetCache提供了两个缓存监视器的实现,一个是采集缓存操作的一些指标信息DefaultCacheMonitor,一个是在缓存变更之后发送缓存变更通知的CacheNotifyMonitor,其中CacheNotifyMonitor在监控到缓存变更之后会去使用BroadcastManager去进行消息的发布订阅,BroadcastManager是发布者也是订阅者,在订阅消费的时会根据缓存变更消息里面的keys去把当前Cache实例的本地缓存进行remove,也就是依赖于这样的一个缓存变更消息通知机制,就可以保证当使用多级缓存的时候,多节点间的本地缓存尽量达成一致
相关文章:
缓存框架JetCache源码解析-缓存变更通知机制
为什么需要缓存变更通知机制?如果我们使用的是本地缓存或者多级缓存(本地缓存远程缓存),当其中一个节点的本地缓存变更之后,为了保证缓存尽量的一致性,此时其他节点的本地缓存也需要去变更,这时…...
Android 设置特定Activity内容顶部显示在状态栏底部,也就是状态栏的下层 以及封装一个方法修改状态栏颜色
推荐:https://github.com/gyf-dev/ImmersionBar 在 Android 中要实现特定 Activity 内容顶部显示在状态栏底部以及封装方法修改状态栏颜色,可以通过以下步骤来完成: 一、让 Activity 内容显示在状态栏底部 在 AndroidManifest.xml 文件中,为特…...
用自己的数据集复现YOLOv5
yolov5已经出了很多版本了,这里我以目前最新的版本为例,先在官网下载源码:GitHub - ultralytics/yolov5: YOLOv5 🚀 in PyTorch > ONNX > CoreML > TFLite 然后下载预训练模型,需要哪个就点击哪个模型就行&am…...
如何在博客中插入其他的博客链接(超简单)最新版
如何在博客中插入其他的博客链接 1.复制自己要添加的网址(组合键:Ctrlc)2. 点击超链接按钮3. 粘贴自己刚才复制的网址(组合键:Ctrlv)并点击确定即可4.让博客链接显示中文5.点击蓝字即可打开 1.复制自己要添…...
JS通过递归函数来剔除树结构特定节点
最近在处理权限类问题过程中,遇到多次需要过滤一下来列表的数据,针对不同用户看到的数据不同。记录一下 我的数据大致是这样的: class UserTree {constructor() {this.userTreeData [// 示例数据{ nodeid: "1", nodename: "R…...
javayufa
1.变量、运算符、表达式、输入输出 编写一个简单的Java程序–手速练习 public class Main { public static void main(String[] args) { System.out.println("Hello World"); } } 三、语法基础 变量 变量必须先定义,才可以使用。不能重名。 变量定义的方…...
软考-高级系统分析师知识点-补充篇
云计算 云计算的体系结构由5部分组成,分别为应用层,平台层,资源层,用户访问层和管理层,云计算的本质是通过网络提供服务,所以其体系结构以服务为核心。 系统的可靠性技术---容错技术---冗余技术 容错是指系…...
JavaScript全面指南(四)
🌈个人主页:前端青山 🔥系列专栏:JavaScript篇 🔖人终将被年少不可得之物困其一生 依旧青山,本期给大家带来JavaScript篇专栏内容:JavaScript全面指南 目录 61、如何防止XSRF攻击 62、如何判断一个对象是否为数组&…...
2024年诺贝尔物理学奖的创新之举
对于2024年诺贝尔物理学奖的这一创新之举,我的观点可以从以下几点展开: 跨学科融合的里程碑:将诺贝尔物理学奖颁发给机器学习与神经网络领域的研究者,标志着科学界对跨学科合作和融合的认可达到新高度。这不仅体现了理论物理与计算…...
FileLink内外网文件交换——致力企业高效安全文件共享
随着数字化转型的推进,企业之间的文件交流需求日益增加。然而,传统的文件传输方式往往无法满足速度和安全性的双重要求。FileLink作为一款专注于跨网文件交换的工具,致力于为企业提供高效、安全的文件共享解决方案。 应用场景一:项…...
使用Python在Jupyter Notebook中显示Markdown文本
使用Python在Jupyter Notebook中显示Markdown文本 引言1. 导入必要的模块2. 定义一个函数来显示Markdown文本3. 使用print_md函数显示Markdown文本4. 总结 引言 作为一名Python初级程序员,你可能已经熟悉了Jupyter Notebook这个强大的工具。Jupyter Notebook不仅支…...
G1 GAN生成MNIST手写数字图像
🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 G1 GAN生成MNIST手写数字图像 1. 生成对抗网络 (GAN) 简介 生成对抗网络 (GAN) 是一种通过“对抗性”学习生成数据的深度学习模型,通常用于生成…...
WPFDeveloper正式版发布
WPFDeveloper WPFDeveloper一个基于WPF自定义高级控件的WPF开发人员UI库,它提供了众多的自定义控件。 该项目的创建者和主要维护者是现役微软MVP 闫驚鏵: https://github.com/yanjinhuagood 该项目还有众多的维护者,详情可以访问github上的README&…...
实现鼠标经过某个元素时弹出提示框(通常称为“工具提示”或“悬浮提示”)
要实现鼠标经过某个元素时弹出提示框(通常称为“工具提示”或“悬浮提示”),你可以使用 JavaScript 结合 CSS 来创建这个效果。以下是详细步骤,包括 HTML、CSS 和 JavaScript 的代码示例。 HTML 结构 首先,创建一个简…...
【GAMES101笔记速查——Lecture 17 Materials and Appearances】
目录 1 材质和外观 1.1 自然界中,外观是光线和材质共同作用的结果 1.2 图形学中,什么是材质? 1.2.1 渲染方程严格正确,其中BRDF项决定了物体的材质 1.2.2 漫反射材质 (1)如何定义漫反射系数࿱…...
对于从vscode ssh到virtualBox的timeout记录
如题,解决方式如下: 1.把虚拟机关机退出来,在这个界面进行网络设置:选桥接网卡 2.然后再进系统,使用命令 ip addr查看如今的ip地址,应该和在本机里面看到的是一个网段 3.打开vscode,该干啥干…...
鸿蒙原生应用扬帆起航
就在2024年6月21日华为在开发者大会上发布了全新操作的系统HarmonyOS Next开发测试版,网友们把它称之为“称之为纯血鸿蒙”。因为在此之前鸿蒙系统底层式有两套基础架构的,一套是是Android的AOSP,一套是鸿蒙的Open Harmony,因为早…...
《计算机视觉》—— 表情识别
根据计算眼睛、嘴巴的变化,判断是什么表情结合以下两篇文章来理解表情识别的实现方法 基于 dilib 库的人脸检测 https://blog.csdn.net/weixin_73504499/article/details/142977202?spm1001.2014.3001.5501 基于 dlib 库的人脸关键点定位 https://blog.csdn.net/we…...
NVIDIA Aerial Omniverse
NVIDIA Aerial Omniverse 数字孪生助力打造新一代无线网络 文章目录 前言一、从链路级仿真到系统级仿真二、转变无线研发方式1. 开放且可定制的模块化平台2. 适用于 6G 标准化的 3GPP 兼容平台3. 部署前测试4. AI 和 ML 在数字孪生中的应用5. 高级物理精准的电磁求解器6. 合作伙…...
QT程序报错解决方案:Cannot queue arguments of type ‘QTextCharFormat‘ 或 ‘QTextCursor‘
项目场景: 项目场景:基于QT实现的C某程序,搭载在Linux环境中。 问题描述 执行程序时,发现log中报错如下内容: QObject::connect: Cannot queue arguments of type QTextCharFormat (Make sure QTextCharFormat is r…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
19c补丁后oracle属主变化,导致不能识别磁盘组
补丁后服务器重启,数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后,存在与用户组权限相关的问题。具体表现为,Oracle 实例的运行用户(oracle)和集…...
51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...
解锁数据库简洁之道:FastAPI与SQLModel实战指南
在构建现代Web应用程序时,与数据库的交互无疑是核心环节。虽然传统的数据库操作方式(如直接编写SQL语句与psycopg2交互)赋予了我们精细的控制权,但在面对日益复杂的业务逻辑和快速迭代的需求时,这种方式的开发效率和可…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...
FFmpeg avformat_open_input函数分析
函数内部的总体流程如下: avformat_open_input 精简后的代码如下: int avformat_open_input(AVFormatContext **ps, const char *filename,ff_const59 AVInputFormat *fmt, AVDictionary **options) {AVFormatContext *s *ps;int i, ret 0;AVDictio…...
基于江科大stm32屏幕驱动,实现OLED多级菜单(动画效果),结构体链表实现(独创源码)
引言 在嵌入式系统中,用户界面的设计往往直接影响到用户体验。本文将以STM32微控制器和OLED显示屏为例,介绍如何实现一个多级菜单系统。该系统支持用户通过按键导航菜单,执行相应操作,并提供平滑的滚动动画效果。 本文设计了一个…...
