当前位置: 首页 > news >正文

缓存框架JetCache源码解析-缓存变更通知机制

为什么需要缓存变更通知机制?如果我们使用的是本地缓存或者多级缓存(本地缓存+远程缓存),当其中一个节点的本地缓存变更之后,为了保证缓存尽量的一致性,此时其他节点的本地缓存也需要去变更,这时候常用的做法就是需要接入一个通知机制,只要有节点的本地缓存变更了,那么这时候就广播出一个缓存变更消息,其他节点会去订阅这个消息,最终消费这个消息的时候把本地缓存的数据进行更新或删除。当然,通知机制的实现有很多,比如可以利用redis的发布订阅机制,也可以接入mq这种消息中间件作为消息通知,不过相比之下,mq就稍微重了点,所以JetCache则是使用redis的发布订阅机制去实现缓存变更通知机制

缓存监视器CacheMonitor

@FunctionalInterface
public interface CacheMonitor {void afterOperation(CacheEvent event);}

如果我们想要监控缓存的每一次操作,或者在缓存的每一次操作的时候进行一些埋点扩展,那么JetCache提供了CacheMonitor接口可以让我们用户自己去进行一些自定义的实现,当然在JetCache内部也有默认的一些缓存监视器的实现:

  • DefaultCacheMonitor

该monitor主要是监控缓存的每一次操作,比如get,put,remove等,基于监控这些操作之下就可以采集到各种操作指标,例如缓存命中次数,获取缓存次数,加载数据次数等等,详细的采集的指标如下:

protected String cacheName;
protected long statStartTime;
protected long statEndTime;protected long getCount;
protected long getHitCount;
protected long getMissCount;
protected long getFailCount;
protected long getExpireCount;
protected long getTimeSum;
protected long minGetTime = Long.MAX_VALUE;
protected long maxGetTime = 0;protected long putCount;
protected long putSuccessCount;
protected long putFailCount;
protected long putTimeSum;
protected long minPutTime = Long.MAX_VALUE;
protected long maxPutTime = 0;protected long removeCount;
protected long removeSuccessCount;
protected long removeFailCount;
protected long removeTimeSum;
protected long minRemoveTime = Long.MAX_VALUE;
protected long maxRemoveTime = 0;protected long loadCount;
protected long loadSuccessCount;
protected long loadFailCount;
protected long loadTimeSum;
protected long minLoadTime = Long.MAX_VALUE;
protected long maxLoadTime = 0;
  • CacheNotifyMonitor

该monitor看名字大概就知道它的作用是用来监控缓存变更之后发送通知的,监控的缓存操作有4种,分别是put,putAll,remove,removeAll,这4个操作都是会让缓存产生变更的操作,代码如下:

@Override
public void afterOperation(CacheEvent event) {if (this.broadcastManager == null) {return;}AbstractCache absCache = CacheUtil.getAbstractCache(event.getCache());if (absCache.isClosed()) {return;}AbstractEmbeddedCache localCache = getLocalCache(absCache);if (localCache == null) {return;}// put事件if (event instanceof CachePutEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CachePutEvent e = (CachePutEvent) event;m.setType(CacheMessage.TYPE_PUT);m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});broadcastManager.publish(m);}// remove事件else if (event instanceof CacheRemoveEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CacheRemoveEvent e = (CacheRemoveEvent) event;m.setType(CacheMessage.TYPE_REMOVE);m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});broadcastManager.publish(m);}// putAll事件else if (event instanceof CachePutAllEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CachePutAllEvent e = (CachePutAllEvent) event;m.setType(CacheMessage.TYPE_PUT_ALL);if (e.getMap() != null) {m.setKeys(e.getMap().keySet().stream().map(k -> convertKey(k, localCache)).toArray());}broadcastManager.publish(m);}// removeAll事件else if (event instanceof CacheRemoveAllEvent) {CacheMessage m = new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;m.setType(CacheMessage.TYPE_REMOVE_ALL);if (e.getKeys() != null) {m.setKeys(e.getKeys().stream().map(k -> convertKey(k, localCache)).toArray());}broadcastManager.publish(m);}
}

缓存变更消息发布器/订阅器BroadcastManager

上面我们知道了JetCache中会通过CacheNotifyMonitor来监控缓存的变更,但是发送缓存变更的消息通知则是交给BroadcastManager去做的。BroadcastManager是一个抽象类,提供了两个抽象方法:

/*** 发布缓存变更消息* @param cacheMessage  cacheMessage*/
public abstract CacheResult publish(CacheMessage cacheMessage);/*** 订阅缓存变更消息*/
public abstract void startSubscribe();

publish方法是发布广播缓存变更消息的方法,传入的CacheMessage参数就是缓存变更消息,而startSubscribe方法则是订阅缓存变更消息的,这两个方法具体由子类进行实现,目前JetCache中主要提供了4种实现,这4种实现都是基于redis的订阅发布机制实现的,只是使用的redis客户端不一样的区别

  • LettuceBroadcastManager
  • RedisBroadcastManager
  • RedissonBroadcastManager
  • SpringDataBroadcastManager

我们这里可以以RedissonBroadcastManager为例,去看它是怎样实现缓存变更消息通知机制的

public class RedissonBroadcastManager extends BroadcastManager {private static final Logger logger = LoggerFactory.getLogger(RedissonBroadcastManager.class);private final RedissonCacheConfig<?, ?> config;private final String channel;private final RedissonClient client;private volatile int subscribeId;private final ReentrantLock reentrantLock = new ReentrantLock();public RedissonBroadcastManager(final CacheManager cacheManager, final RedissonCacheConfig<?, ?> config) {super(cacheManager);checkConfig(config);this.config = config;this.channel = config.getBroadcastChannel();this.client = config.getRedissonClient();}@Overridepublic void startSubscribe() {reentrantLock.lock();try {if (this.subscribeId == 0 && Objects.nonNull(this.channel) && !this.channel.isEmpty()) {this.subscribeId = this.client.getTopic(this.channel).addListener(byte[].class, (channel, msg) -> processNotification(msg, this.config.getValueDecoder()));}}finally {reentrantLock.unlock();}}@Overridepublic void close() {reentrantLock.lock();try {final int id;if ((id = this.subscribeId) > 0 && Objects.nonNull(this.channel)) {this.subscribeId = 0;try {this.client.getTopic(this.channel).removeListener(id);} catch (Throwable e) {logger.warn("unsubscribe {} fail", this.channel, e);}}}finally {reentrantLock.unlock();}}@Overridepublic CacheResult publish(final CacheMessage cacheMessage) {try {if (Objects.nonNull(this.channel) && Objects.nonNull(cacheMessage)) {final byte[] msg = this.config.getValueEncoder().apply(cacheMessage);this.client.getTopic(this.channel).publish(msg);return CacheResult.SUCCESS_WITHOUT_MSG;}return CacheResult.FAIL_WITHOUT_MSG;} catch (Throwable e) {SquashedLogger.getLogger(logger).error("jetcache publish error", e);return new CacheResult(e);}}
}

首先当调用Cache实例的put,putAll,remove,removeAll这些方法的时候,就会去触CacheNotifyMonitor,CacheNotifyMonitor这时候就会去创建一个缓存变更信息然后交给RedissonBroadcastManager去调用publish方法,在publish方法就会使用redisson的发布api去广播出这条缓存变更消息,当然订阅也是使用redisson的订阅api,当监听到有缓存变更消息的时候,这时候就会去回调processNotification方法,该方法是父类BroadcastManager的方法:

protected void processNotification(byte[] message, Function<byte[], Object> decoder) {try {if (message == null) {logger.error("notify message is null");return;}Object value = decoder.apply(message);if (value == null) {logger.error("notify message is null");return;}if (value instanceof CacheMessage) {processCacheMessage((CacheMessage) value);} else {logger.error("the message is not instance of CacheMessage, class={}", value.getClass());}} catch (Throwable e) {SquashedLogger.getLogger(logger).error("receive cache notify error", e);}
}private void processCacheMessage(CacheMessage cacheMessage) {// 条件成立:说明这个缓存消息是自己发送的,这时候不用处理if (sourceId.equals(cacheMessage.getSourceId())) {return;}// 根据area和cacheName从缓存管理器中获取到对应的Cache实例Cache cache = cacheManager.getCache(cacheMessage.getArea(), cacheMessage.getCacheName());if (cache == null) {logger.warn("Cache instance not exists: {},{}", cacheMessage.getArea(), cacheMessage.getCacheName());return;}// 如果这个Cache实例不是一个多级缓存的Cache实例,那么就直接return,因为这里主要是针对多级缓存的情况下进行处理的Cache absCache = CacheUtil.getAbstractCache(cache);if (!(absCache instanceof MultiLevelCache)) {logger.warn("Cache instance is not MultiLevelCache: {},{}", cacheMessage.getArea(), cacheMessage.getCacheName());return;}// 获取到多级缓存中所有的Cache实例Cache[] caches = ((MultiLevelCache) absCache).caches();// 获取到缓存有变动的keySet<Object> keys = Stream.of(cacheMessage.getKeys()).collect(Collectors.toSet());// 遍历所有的Cache实例for (Cache c : caches) {Cache localCache = CacheUtil.getAbstractCache(c);// 只针对本地缓存的Cache实例,把缓存有变动的key从本地缓存中移除if (localCache instanceof AbstractEmbeddedCache) {((AbstractEmbeddedCache) localCache).__removeAll(keys);} else {break;}}
}

最终会调用到processCacheMessage方法,需要注意的是如果是自己发送的消息则不需要再去消费,所以会使用一个sourceId去进行过滤,并且还会去判断当前的Cache实例是否是一个多级缓存实例,如果是多级缓存,那么就把这个多级缓存种的所有本地缓存都取出来,然后最后根据缓存变更消息里面的key把对应的本地缓存都remove掉,可能有人问为什么这里对本地缓存进行remove而不是update呢?因为如果使用update的话,还需要考虑并发的场景,比如这个key进行了两次更新,先后会发布两次缓存变更消息,但是订阅者去消费这两个消息的时候并不能去保证它们的先后顺序,此时就会有可能造成更新顺序的问题,但是使用remove就可能完全避免这种并发问题了

总结

在JetCache中提供了对于缓存操作的后置扩展接口,也叫做缓存监控器,我们可以自己去实现自己的缓存监控器来对缓存操作之后做一些自定义的功能,默认地JetCache提供了两个缓存监视器的实现,一个是采集缓存操作的一些指标信息DefaultCacheMonitor,一个是在缓存变更之后发送缓存变更通知的CacheNotifyMonitor,其中CacheNotifyMonitor在监控到缓存变更之后会去使用BroadcastManager去进行消息的发布订阅,BroadcastManager是发布者也是订阅者,在订阅消费的时会根据缓存变更消息里面的keys去把当前Cache实例的本地缓存进行remove,也就是依赖于这样的一个缓存变更消息通知机制,就可以保证当使用多级缓存的时候,多节点间的本地缓存尽量达成一致

相关文章:

缓存框架JetCache源码解析-缓存变更通知机制

为什么需要缓存变更通知机制&#xff1f;如果我们使用的是本地缓存或者多级缓存&#xff08;本地缓存远程缓存&#xff09;&#xff0c;当其中一个节点的本地缓存变更之后&#xff0c;为了保证缓存尽量的一致性&#xff0c;此时其他节点的本地缓存也需要去变更&#xff0c;这时…...

Android 设置特定Activity内容顶部显示在状态栏底部,也就是状态栏的下层 以及封装一个方法修改状态栏颜色

推荐:https://github.com/gyf-dev/ImmersionBar 在 Android 中要实现特定 Activity 内容顶部显示在状态栏底部以及封装方法修改状态栏颜色&#xff0c;可以通过以下步骤来完成&#xff1a; 一、让 Activity 内容显示在状态栏底部 在 AndroidManifest.xml 文件中&#xff0c;为特…...

用自己的数据集复现YOLOv5

yolov5已经出了很多版本了&#xff0c;这里我以目前最新的版本为例&#xff0c;先在官网下载源码&#xff1a;GitHub - ultralytics/yolov5: YOLOv5 &#x1f680; in PyTorch > ONNX > CoreML > TFLite 然后下载预训练模型&#xff0c;需要哪个就点击哪个模型就行&am…...

如何在博客中插入其他的博客链接(超简单)最新版

如何在博客中插入其他的博客链接 1.复制自己要添加的网址&#xff08;组合键&#xff1a;Ctrlc&#xff09;2. 点击超链接按钮3. 粘贴自己刚才复制的网址&#xff08;组合键&#xff1a;Ctrlv&#xff09;并点击确定即可4.让博客链接显示中文5.点击蓝字即可打开 1.复制自己要添…...

JS通过递归函数来剔除树结构特定节点

最近在处理权限类问题过程中&#xff0c;遇到多次需要过滤一下来列表的数据&#xff0c;针对不同用户看到的数据不同。记录一下 我的数据大致是这样的&#xff1a; class UserTree {constructor() {this.userTreeData [// 示例数据{ nodeid: "1", nodename: "R…...

javayufa

1.变量、运算符、表达式、输入输出 编写一个简单的Java程序–手速练习 public class Main { public static void main(String[] args) { System.out.println("Hello World"); } } 三、语法基础 变量 变量必须先定义&#xff0c;才可以使用。不能重名。 变量定义的方…...

软考-高级系统分析师知识点-补充篇

云计算 云计算的体系结构由5部分组成&#xff0c;分别为应用层&#xff0c;平台层&#xff0c;资源层&#xff0c;用户访问层和管理层&#xff0c;云计算的本质是通过网络提供服务&#xff0c;所以其体系结构以服务为核心。 系统的可靠性技术---容错技术---冗余技术 容错是指系…...

JavaScript全面指南(四)

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;JavaScript篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来JavaScript篇专栏内容:JavaScript全面指南 目录 61、如何防止XSRF攻击 62、如何判断一个对象是否为数组&…...

2024年诺贝尔物理学奖的创新之举

对于2024年诺贝尔物理学奖的这一创新之举&#xff0c;我的观点可以从以下几点展开&#xff1a; 跨学科融合的里程碑&#xff1a;将诺贝尔物理学奖颁发给机器学习与神经网络领域的研究者&#xff0c;标志着科学界对跨学科合作和融合的认可达到新高度。这不仅体现了理论物理与计算…...

FileLink内外网文件交换——致力企业高效安全文件共享

随着数字化转型的推进&#xff0c;企业之间的文件交流需求日益增加。然而&#xff0c;传统的文件传输方式往往无法满足速度和安全性的双重要求。FileLink作为一款专注于跨网文件交换的工具&#xff0c;致力于为企业提供高效、安全的文件共享解决方案。 应用场景一&#xff1a;项…...

使用Python在Jupyter Notebook中显示Markdown文本

使用Python在Jupyter Notebook中显示Markdown文本 引言1. 导入必要的模块2. 定义一个函数来显示Markdown文本3. 使用print_md函数显示Markdown文本4. 总结 引言 作为一名Python初级程序员&#xff0c;你可能已经熟悉了Jupyter Notebook这个强大的工具。Jupyter Notebook不仅支…...

G1 GAN生成MNIST手写数字图像

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 G1 GAN生成MNIST手写数字图像 1. 生成对抗网络 (GAN) 简介 生成对抗网络 (GAN) 是一种通过“对抗性”学习生成数据的深度学习模型&#xff0c;通常用于生成…...

WPFDeveloper正式版发布

WPFDeveloper WPFDeveloper一个基于WPF自定义高级控件的WPF开发人员UI库&#xff0c;它提供了众多的自定义控件。 该项目的创建者和主要维护者是现役微软MVP 闫驚鏵: https://github.com/yanjinhuagood 该项目还有众多的维护者&#xff0c;详情可以访问github上的README&…...

实现鼠标经过某个元素时弹出提示框(通常称为“工具提示”或“悬浮提示”)

要实现鼠标经过某个元素时弹出提示框&#xff08;通常称为“工具提示”或“悬浮提示”&#xff09;&#xff0c;你可以使用 JavaScript 结合 CSS 来创建这个效果。以下是详细步骤&#xff0c;包括 HTML、CSS 和 JavaScript 的代码示例。 HTML 结构 首先&#xff0c;创建一个简…...

【GAMES101笔记速查——Lecture 17 Materials and Appearances】

目录 1 材质和外观 1.1 自然界中&#xff0c;外观是光线和材质共同作用的结果 1.2 图形学中&#xff0c;什么是材质&#xff1f; 1.2.1 渲染方程严格正确&#xff0c;其中BRDF项决定了物体的材质 1.2.2 漫反射材质 &#xff08;1&#xff09;如何定义漫反射系数&#xff1…...

对于从vscode ssh到virtualBox的timeout记录

如题&#xff0c;解决方式如下&#xff1a; 1.把虚拟机关机退出来&#xff0c;在这个界面进行网络设置&#xff1a;选桥接网卡 2.然后再进系统&#xff0c;使用命令 ip addr查看如今的ip地址&#xff0c;应该和在本机里面看到的是一个网段 3.打开vscode&#xff0c;该干啥干…...

鸿蒙原生应用扬帆起航

就在2024年6月21日华为在开发者大会上发布了全新操作的系统HarmonyOS Next开发测试版&#xff0c;网友们把它称之为“称之为纯血鸿蒙”。因为在此之前鸿蒙系统底层式有两套基础架构的&#xff0c;一套是是Android的AOSP&#xff0c;一套是鸿蒙的Open Harmony&#xff0c;因为早…...

《计算机视觉》—— 表情识别

根据计算眼睛、嘴巴的变化&#xff0c;判断是什么表情结合以下两篇文章来理解表情识别的实现方法 基于 dilib 库的人脸检测 https://blog.csdn.net/weixin_73504499/article/details/142977202?spm1001.2014.3001.5501 基于 dlib 库的人脸关键点定位 https://blog.csdn.net/we…...

NVIDIA Aerial Omniverse

NVIDIA Aerial Omniverse 数字孪生助力打造新一代无线网络 文章目录 前言一、从链路级仿真到系统级仿真二、转变无线研发方式1. 开放且可定制的模块化平台2. 适用于 6G 标准化的 3GPP 兼容平台3. 部署前测试4. AI 和 ML 在数字孪生中的应用5. 高级物理精准的电磁求解器6. 合作伙…...

QT程序报错解决方案:Cannot queue arguments of type ‘QTextCharFormat‘ 或 ‘QTextCursor‘

项目场景&#xff1a; 项目场景&#xff1a;基于QT实现的C某程序&#xff0c;搭载在Linux环境中。 问题描述 执行程序时&#xff0c;发现log中报错如下内容&#xff1a; QObject::connect: Cannot queue arguments of type QTextCharFormat (Make sure QTextCharFormat is r…...

MySQL知识点_03

MySQL 命令大全 基础命令 操作命令连接到 MySQL 数据库mysql -u 用户名 -p查看所有数据库SHOW DATABASES;选择一个数据库USE 数据库名;查看所有表SHOW TABLES;查看表结构DESCRIBE 表名; 或 SHOW COLUMNS FROM 表名;创建一个新数据库CREATE DATABASE 数据库名;删除一个数据库D…...

leetcode:744. 寻找比目标字母大的最小字母(python3解法)

难度&#xff1a;简单 给你一个字符数组 letters&#xff0c;该数组按非递减顺序排序&#xff0c;以及一个字符 target。letters 里至少有两个不同的字符。 返回 letters 中大于 target 的最小的字符。如果不存在这样的字符&#xff0c;则返回 letters 的第一个字符。 示例 1&a…...

2015年-2016年 软件工程程序设计题(算法题)实战_c语言程序设计数据结构程序设计分析

文章目录 2015年1.c语言程序设计部分2.数据结构程序设计部分 2016年1.c语言程序设计部分2.数据结构程序设计部分 2015年 1.c语言程序设计部分 1.从一组数据中选择最大的和最小的输出。 void print_maxandmin(double a[],int length) //在一组数据中选择最大的或者最小的输出…...

整理一下实际开发和工作中Git工具的使用 (持续更新中)

介绍一下Git 在实际开发和工作中&#xff0c;Git工具的使用可以说是至关重要的&#xff0c;它不仅提高了团队协作的效率&#xff0c;还帮助开发者有效地管理代码版本。以下是对Git工具使用的扩展描述&#xff1a; 版本控制&#xff1a;Git能够跟踪代码的每一个修改记录&#x…...

Axios 的基本使用与 Fetch 的比较、在 Vue 项目中使用 Axios 的最佳实践

文章目录 1. 引言2. Axios 的基本使用2.1 安装 Axios2.2 发起 GET 请求2.3 发起 POST 请求2.4 请求拦截器2.5 设置全局配置 3. Axios 与 Fetch 的比较3.1 Axios 与 Fetch 的异同点3.2 Fetch 的基本使用3.3 使用 Fetch 处理 POST 请求 4. 讨论在 Vue 项目中使用 Axios 的最佳实践…...

Dockerfile样例

一、基础jar镜像制作 ## Dockerfile FROM registry.openanolis.cn/openanolis/anolisos:8.9 RUN mkdir /work ADD jdk17.tar.gz fonts.tar.gz /work/ RUN yum install fontconfig ttmkfdir -y && yum clean all && \chmod -R 755 /work/fonts ADD fonts.conf …...

MYSQL-多表查询

一、概述 1、定义 多表查询&#xff0c;也称为关联查询&#xff0c;指两个或更多个表一起完成查询操作。 2、前提条件 这些一起查询的表之间是有关系的&#xff08;一对一、一对多&#xff09;&#xff0c;它们之间一定是有关联字段&#xff0c;这个关联字段可能建立了外键…...

MySQL改密码后不生效问题

MySQL修改密码后连接报密码错误 1.mysql修改密码命令&#xff1a; 这两种连接方式密码都必须修改 修改远程连接密码 ALTER USER ‘root’‘%’ IDENTIFIED BY ‘password’; 修改本地连接密码 ALTER USER ‘root’‘localhost’ IDENTIFIED BY ‘password’; 修改完后必须刷新…...

15分钟学Go 第1天:Go语言简介与特点

Go语言简介与特点 1. Go语言概述 Go语言&#xff08;又称Golang&#xff09;是由谷歌于2007年开发并在2009年正式发布的一种开源编程语言。它旨在简单、高效地进行软件开发&#xff0c;尤其适合于网络编程和分布式系统。 1.1 发展背景 多核处理器&#xff1a;随着计算机硬件…...

UDP/TCP协议

网络层只负责将数据包送达至目标主机&#xff0c;并不负责将数据包上交给上层的哪一个应用程序&#xff0c;这是传输层需要干的事&#xff0c;传输层通过端口来区分不同的应用程序。传输层协议主要分为UDP&#xff08;用户数据报协议&#xff09;和TCP&#xff08;传输控制协议…...