如何实现一个分布式锁
如何实现一个分布式锁
本篇内容主要介绍如何使用 Java 语言实现一个注解式的分布式锁,主要是通过注解+AOP 环绕通知来实现。
1. 锁注解
我们首先写一个锁的注解
/*** 分布式锁注解*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface RedisLock {long DEFAULT_TIMEOUT_FOR_LOCK = 5L;long DEFAULT_EXPIRE_TIME = 60L;String key() default "your-biz-key";long expiredTime() default DEFAULT_EXPIRE_TIME;long timeoutForLock() default DEFAULT_TIMEOUT_FOR_LOCK;}
expiredTime 是设置锁的过期时间,timeoutForLock 是设置等待锁的超时时间。如果没有等待获得锁的超时时间这个功能,那么其他线程在获取锁失败时只能直接失败,无法进行排队等待。
我们如何使用这个注解呢,很容易,在需要加锁的业务方法上直接用就行.如下,我们有一个库存服务类,它有一个扣减库存方法,该方法将数据库中的一个库存商品的数量减一。在并发场景下,如果我们没有对其进行资源控制,必然会发生库存扣减不一致现象。
public class StockServiceImpl {@RedisLock(key = "stock-lock", expiredTime = 10L, timeoutForLock = 5L)public void deduct(Long stockId) {Stock stock = this.getById(1L);Integer count = stock.getCount();stock.setCount(count - 1);this.updateById(stock);}
}
2. 在 AOP 切面中进行加锁处理
我们需要使用 AOP 来处理什么?自然是处理使用@RedisLock
的方法,因此我们写一个切点表达式,它匹配所有标有 @RedisLock
注解的方法。
接着,我们将此切点表达式与 @Around
注解结合使用,以创建环绕通知,在目标方法执行前后执行我们的加锁解锁逻辑。
因此,基本的逻辑我们就理清了,代码大致长下面这个样子:
public class RedisLockAspect {private final RedisTemplate<String, Object> redisTemplate;// 锁的redis key前缀private static final String DEFAULT_KEY_PREFIX = "lock:";// 匹配所有标有 @RedisLock 注解的方法@Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")public void lockAnno() {}@Around("lockAnno()")public void invoke(ProceedingJoinPoint joinPoint) throws Exception {// 获取拦截方法上的RedisLock注解RedisLock annotation = getLockAnnotationOnMethod(joinPoint);// 获取锁keyString key = getKey(annotation);// 锁过期时间long expireTime = annotation.expiredTime();// 获取锁的等待时间long timeoutForLock = annotation.timeoutForLock();// 在这里加锁someCodeForLock...// 执行业务joinPoint.proceed();// 在这里解锁someCodeForUnLock...}
我们在加锁的时候,需要用上 timeoutForLock
这个属性,我们通过自旋加线程休眠的方式,来达到在一段时间内等待获取锁的目的。如果自旋时间结束后,还没获取锁,则抛出异常,这里可以根据自己情况而定。自旋加锁代码如下:
// 自旋获取锁long endTime = System.currentTimeMillis() + timeoutForLock * 1000;boolean acquired = false;String uuid = UUID.randomUUID().toString();while(System.currentTimeMillis() < endTime) {Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);if (Boolean.TRUE.equals(absent)) {acquired = true;break;} else {// 获取不到锁,尝试休眠100毫秒后重试Thread.sleep(100);}}// 超时未获取到锁, 抛出异常,可根据自己业务而定if (!acquired) {throw new RuntimeException("获取锁异常");}
我们发现上面加锁的时候设置了一个 uuid 作为 value 值,这是为了在锁释放的时候,不误删其他线程上的锁,随后,我们就可以执行被 AOP 切中的方法,执行结束释放锁。代码如下:
try {// 执行业务joinPoint.proceed();} catch (Throwable e) {log.error("业务执行出错!");} finally {// 解锁时进行校验,只删除自己线程加的锁String value = (String) redisTemplate.opsForValue().get(key);if (uuid.equals(value)) {redisTemplate.delete(key);} else {log.warn("锁已过期!");}}
到这里,我们就以注解+AOP 的方式实现了分布式锁的功能。当然,以上只实现了分布式锁的简单功能,还缺少了分布式锁的 key 自动续约防止锁过期功能,以及锁重入功能。
目前,RedisLockAspect
的完整代码如下:
@Component
@Aspect
@Slf4j
@AllArgsConstructor
public class RedisLockAspect {// 匹配所有标有 @RedisLock 注解的方法@Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")public void lockAnno() {}@Around("lockAnno()")public void invoke(ProceedingJoinPoint joinPoint) throws Exception {// 获取拦截方法上的RedisLock注解RedisLock annotation = getLockAnnotationOnMethod(joinPoint);String key = getKey(annotation);// 锁过期时间long expireTime = annotation.expiredTime();// 获取锁的等待时间long timeoutForLock = annotation.timeoutForLock();// 自旋获取锁long endTime = System.currentTimeMillis() + timeoutForLock * 1000;boolean acquired = false;String uuid = UUID.randomUUID().toString();while(System.currentTimeMillis() < endTime) {Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);if (Boolean.TRUE.equals(absent)) {acquired = true;break;} else {// 获取不到锁,尝试休眠100毫秒后重试Thread.sleep(100);}}// 超时未获取到锁, 抛出异常,可根据自己业务而定if (!acquired) {throw new RuntimeException("获取锁异常");}try {// 执行业务joinPoint.proceed();} catch (Throwable e) {log.error("业务执行出错!");} finally {// 解锁时进行校验,只删除自己线程加的锁String value = (String) redisTemplate.opsForValue().get(key);if (uuid.equals(value)) {redisTemplate.delete(key);} else {log.warn("锁已过期!");}}}private String getKey(RedisLock redisLock) {if (Objects.isNull(redisLock)) {return DEFAULT_KEY_PREFIX + "default";}return DEFAULT_KEY_PREFIX + redisLock.key();}private RedisLock getLockAnnotationOnMethod(ProceedingJoinPoint joinPoint) {MethodSignature signature = (MethodSignature) joinPoint.getSignature();Method method = signature.getMethod();return method.getAnnotation(RedisLock.class);}}
3. key 自动续约防止锁过期
我们接着完善该分布式锁,为其添加 key 自动续约防止锁过期的功能。我们的思路与Redission的watch dog类似,开启一个后台线程,来定时检查需要续约的锁。我们如何判断一个锁是否需要续约呢,我们可以简单定义一个续约分界线,比如在锁过期时间的三分之二的时间点及之后,对锁进行续约。
3.1 定义一个续约任务4
我们来定义一个锁续约任务,那我们需要什么信息呢?
我们至少需要锁的 key,锁要设置的过期时间。这是两个最基本的信息。
要判断在锁过期时间的三分之二的时间点及之后进行续约,那么我们还需要记录锁上次续约的时间点。
此外,我们还可以为锁续约任务添加最大续约次数限制,这可以避免某些执行时间特别久的任务不断占用锁。所以我们还需要记录当前锁续约次数和最大续约次数。
对超过最大续约次数的锁的线程,我们直接将其停止,因此我们也记录一下该锁的线程。
结合上面的分析,我们定义的锁续约任务类如下:
public class LockRenewTask {/*** key*/private final String key;/*** 过期时间。单位:秒*/private final long expiredTime;/*** 锁的最大续约次数*/private final int maxRenewCount;/*** 锁的当前续约次数*/private int currentRenewCount;/*** 最新更新时间*/private LocalDateTime latestRenewTime;/*** 业务线程*/private final Thread thread;public LockRenewTask(String key, long expiredTime, int maxRenewCount, Thread thread) {this.key = key;this.expiredTime = expiredTime;this.maxRenewCount = maxRenewCount;this.thread = thread;this.latestRenewTime = LocalDateTime.now();}/*** 是否到达续约时间* @return*/public boolean isTimeToRenew() {LocalDateTime now = LocalDateTime.now();Duration duration = Duration.between(latestRenewTime, now);return duration.toSeconds() >= ((double)(this.expiredTime / 3) * 2);}/*** 是否达到最大续约次数* @return*/public boolean exceedMaxRenewCount() {return this.currentRenewCount >= this.maxRenewCount;}public synchronized void renew() {this.currentRenewCount++;this.latestRenewTime = LocalDateTime.now();}// 取消业务方法public void cancel() {thread.interrupt();}public String getKey() {return key;}public long getExpiredTime() {return expiredTime;}
}
我们添��了一些关于锁续约的方法:
isTimeToRenew()
: 判断是否可以对锁进行续约exceedMaxRenewCount()
: 判断是否达到最大续约次数renew()
: 来标记一次续约操作cancel()
: 取消业务方法
3.2 定义一个锁续约任务处理器
接着,我们定义一个定时执行该续约任务的 handler
。该 handler
也比较简答,核心逻辑是持有一个类型为 List<LockRenewTask>
的 taskList
来添加续约任务,且使用一个 ScheduledExecutorService
来定时遍历该 taskList
来执行续约任务。该 handler
再对外暴露一个 addRenewTask
方法,方便外部调用来添加续约任务到 taskList
中。
@Slf4j
@Component
public class LockRenewHandler {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 保障对 taskList的添加删除操作是线程安全的*/private final ReentrantLock taskListLock = new ReentrantLock();private final List<LockRenewTask> taskList = new ArrayList<>();private final ScheduledExecutorService taskExecutorService;{taskExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());taskExecutorService.scheduleAtFixedRate(() -> {try {executeRenewTask();} catch (Exception e) {//错误处理}}, 1, 2, TimeUnit.SECONDS);}/*** 添加续约任务*/public void addRenewTask(LockRenewTask task) {taskListLock.lock();try {taskList.add(task);} finally {taskListLock.unlock();}}/*** 执行续约任务*/private void executeRenewTask() {log.info("开始执行续约任务");if (CollectionUtils.isEmpty(taskList)) {return;}// 需要删除的任务,暂存这个集合中 取消List<LockRenewTask> cancelTask = new ArrayList<>();// 获取任务副本List<LockRenewTask> copyTaskList = new ArrayList<>(taskList);for (LockRenewTask task : copyTaskList) {try {// 判断 Redis 中是否存在 keyif (!redisTemplate.hasKey(task.getKey())) {cancelTask.add(task);continue;}// 大于等于最大续约次数if (task.exceedMaxRenewCount()) {// 停止续约任务task.cancel();cancelTask.add(task);continue;}// 到达续约时间if (task.isTimeToRenew()) {log.info("续约任务:{}", task.getKey());redisTemplate.expire(task.getKey(), task.getExpiredTime(), TimeUnit.SECONDS);task.renew();}} catch (Exception e) {//错误处理log.error("处理任务出错:{}", task);}}// 加锁,删除 taskList 中需要移除的任务taskListLock.lock();try {taskList.removeAll(cancelTask);// 清理cancelTask,避免堆积,产生内存泄露cancelTask.clear();} finally {taskListLock.unlock();}}
}
总结一下 LockRenewHandler的主要作用:它负责管理和执行续约任务,以延长 Redis 中键的过期时间。
- 添加续约任务:
addRenewTask()
方法允许添加新的续约任务到内部列表taskList
中。 - 执行续约任务:
executeRenewTask()
方法定期执行续约任务。它检查每个任务的状态,并根据需要续约Redis
中的键。 - 移除完成的任务:维护一个
cancelTask
列表,用于存储需要从taskList
中移除的任务。在executeRenewTask()
方法中,它会将完成的任务添加到cancelTask
列表中,并在之后将其从taskList
中移除。
大概的工作流程如下:
-
续约任务被添加到
taskList
中。 -
executeRenewTask()
方法定期执行,它检查每个任务的状态:- 如果
Redis
中不再存在该键,则取消任务。 - 如果任务的续约次数达到上限,则取消任务。
- 如果是时候续约了,则续约 Redis 中的键并更新任务的续约次数,记录续约时间点。
- 如果
-
完成的任务被添加到
cancelTask
列表中。 -
executeRenewTask()
方法获取taskList
的副本,并从副本中移除cancelTask
中的任务,并且在完成移除任务操作后清空cancelTask
。 -
更新后的
taskList
被保存回类中。
两个需要注意的点
- 我们遍历
taskList
时拷贝了一份副本进行遍历,因为taskList
是可变的,这样可以避免在遍历的时候产生并发修改问题。 cancelTask
需要清理,避免产生内存泄漏。
通过这种方式,LockRenewHandler
可以确保 Redis
中的键在需要时得到续约,并自动移除完成或失败的任务。
3.3 添加锁续约任务
在上面 3.1 节和 3.2 节我们定义好了锁续约任务和处理锁续约任务的核心代码,接下来我们需要在第 2 节加锁解锁的 AOP 处理逻辑上进行一点小小的修改,主要就是在执行加锁之后,执行业务代码之前,添加上锁续约任务。修改位置如下:
public void invoke(ProceedingJoinPoint joinPoint) throws Exception {... // 省略代码try {// 添加锁续约任务LockRenewTask task = new LockRenewTask(key, annotation.expiredTime(), annotation.maxRenew(), Thread.currentThread());lockRenewHandler.addRenewTask(task);log.info("添加续约任务, key:{}", key);// 执行业务joinPoint.proceed();} catch (Throwable e) {log.error("业务执行出错!");} finally {// 解锁时进行校验,只删除自己线程加的锁String value = (String) redisTemplate.opsForValue().get(key);if (uuid.equals(value)) {redisTemplate.delete(key);} else {log.warn("锁已过期!");}}... // 省略代码
}
到这里,我们的分布式锁已经相当完善了,把锁自动续约的功能也加上了。当然,还没有实现锁的可重入性。
相关文章:

如何实现一个分布式锁
如何实现一个分布式锁 本篇内容主要介绍如何使用 Java 语言实现一个注解式的分布式锁,主要是通过注解AOP 环绕通知来实现。 1. 锁注解 我们首先写一个锁的注解 /*** 分布式锁注解*/ Retention(RetentionPolicy.RUNTIME) Target({ElementType.METHOD}) Documente…...

Ajax从零到实战
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 非常期待和您一起在这个小…...

编程参考 - 在C++移动构造函数声明中使用noexcept
在 C 中,noexcept 是用于表示函数不抛出异常的指定符。它既可用于常规函数,也可用于特殊成员函数,包括构造函数和析构函数。使用 noexcept 可以帮助编译器进行优化,提高代码的安全性和正确性。 In C, noexcept is a specifier use…...

Vue2/Vue3实现全局/局部添加防篡改水印的效果。删除元素无效!更改元素属性无效!支持图片、元素、视频等等。
水印目的 版权保护:水印可以在图片、文档或视频中嵌入作者、品牌或版权所有者的信息,以防止未经授权的复制、传播或使用。当其他人使用带有水印的内容时,可以追溯到原始作者或版权所有者,从而加强版权保护。 身份识别:水印可以用作作者或品牌的标识符,使观众能够轻松识…...

GuLi商城-商品服务-API-属性分组-获取分类属性分组
获取分类属性分组接口开发 操作的是这张表 造数据: 后台代码: @Override public PageUtils queryPage(Map<String, Object> params, Long catelogId) {//select * from pms_attr_group where catelog_id=? and (attr_group_id=key or attr_group_name like %key%)Stri…...

安全测试理论
安全测试理论 什么是安全测试? 安全测试:发现系统安全隐患的过程安全测试与传统测试区别 传统测试:发现bug为目的 安全测试:发现系统安全隐患什么是渗透测试 渗透测试:已成功入侵系统为目标的的攻击过程渗透测试与安全…...

序列化和反序列化
面试题:对序列化和反序列化的理解? 我们之所以需要序列化,它核心的目的是为了解决网络通信之间的对象传输的问题,也就是说,如何把当前JVM进程的一个对象,通过跨网络传输到另一个JVM进程里面,而序…...

OpenCV中使用Canny算法在图像中查找边缘
操作系统:ubuntu22.04OpenCV版本:OpenCV4.9IDE:Visual Studio Code编程语言:C11 算法描述 Canny算法是一种广泛应用于计算机视觉和图像处理领域中的边缘检测算法。它由John F. Canny在1986年提出,旨在寻找给定噪声条件下的最佳边…...

基于springboot+vue+uniapp的机电公司管理信息系统
开发语言:Java框架:springbootuniappJDK版本:JDK1.8服务器:tomcat7数据库:mysql 5.7(一定要5.7版本)数据库工具:Navicat11开发软件:eclipse/myeclipse/ideaMaven包&#…...

电子期刊制作实战教程:从零开始制作
随着互联网的普及,电子期刊已经成为了信息传递的重要载体。它以便捷、环保、互动性强等特点受到了越来越多人的青睐。那么,如何从零开始制作一份吸引人的电子期刊呢? 1.要制作电子杂志,首先需要选择一款适合自己的软件。比如FLBOOK在线制作…...

11.FreeRTOS_事件组
事件组概述 事件组的作用: 可以等待某一个事件发生可以等待若干个事件发生可以等待若干个事件中的某一个事件发生 同步点是事件组的另一个使用方式,它可以让多个任务进行阻塞等待,当全部事件完成后,再一起解除任务的阻塞。常常…...

Python爬虫-爬取三国演义文本数据-bs4
bs4进行数据解析 -数据解析的原理: - 1.标签定位 -2.提取标签、标签属性中存储的数据值 - bs4数据解析的原理: - 1.实例化一个BeautifulSoup对象,并且将页面源码数据加载到该对象中 -2.通过调用BeautifulSoup对象中相关的属性或者方法进行标签定位和数据提取 - 环境安装: - pi…...

html5——列表、表格
目录 列表 无序列表 有序列表 自定义列表 表格 基本结构 示例 表格的跨列 表格的跨行 列表 无序列表 <ul>【声明无序列表】 <li>河间驴肉火烧</li>【声明列表项】 <li>唐山棋子烧饼</li> <li>邯郸豆沫</li> <l…...

【Python字符串攻略】:玩转文字,编织程序的叙事艺术
文章目录 🚀一.字符串基础🌈二.查看数据类型⭐三.转化❤️四.字符串索引🚲五.字符串切片🎬六.字符串切片-步长☔七.反向切片注意事项🚲八.字符串💥查💥改💥删 ❤️九.字符串拼接&…...

element form表单中密码框被自动赋值,并默认背景色为白色,手动输值后背景色才是自己配置的背景色,与表单的自动填充有关
事件背景: 一个表单,有两组需要输入密码的地方,两组都被填充用户名密码,其中一组是其他信息,不是用户名密码,也被填充了,且input背景色是白色,表单中的input已经手动配置为无背景色&…...

【UE5.1 角色练习】15-枪械射击——子弹发射物
目录 效果 步骤 一、创建并发射子弹 二、优化子弹 效果 步骤 一、创建并发射子弹 1. 在前面的文章中(【UE5.1 角色练习】06-角色发射火球-part1)我们创建了蓝图“BP_Skill_FireBall” 这里我们复制一份命名为“BP_Ammo_5mm”,用于表示…...

Zynq7000系列FPGA中的DMA控制器的编程限制
有关DMAC编程时适用的限制信息,有四个考虑因素: 固定非对齐突发Endian swap size restrictions:在数据传输或处理过程中,不同字节序(Endian)之间的转换和对应的限制在DMA周期内更新通道控制寄存器当MFIFO满…...

超简易高效的 AI绘图工具—与sd-webui一致界面,6G显存最高提升75%出图速率!(附安装包)
大家好,我是灵魂画师向阳 今天给大家分享一个基于Stable Diffusion WebUI 构建的AI绘图工具—sd-webui-forge,该工具的目标在于简化插件开发,优化资源管理,加速推理。 Forge承诺永远不会对Stable Diffusion WebUI用户界面添加不…...

ArduPilot开源代码之OpticalFlow_backend
ArduPilot开源代码之OpticalFlow_backend 1. 源由2. Library设计3. 重要例程3.1 OpticalFlow_backend::_update_frontend3.2 OpticalFlow_backend::_applyYaw 4. 总结5. 参考资料 1. 源由 光流计是一种低成本定位传感器,所有的光流计设备传感驱动代码抽象公共部分统…...

设计模式探索:适配器模式
1. 适配器模式介绍 1.1 适配器模式介绍 适配器模式(adapter pattern)的原始定义是:将一个类的接口转换为客户期望的另一个接口,适配器可以让不兼容的两个类一起协同工作。 适配器模式的主要作用是把原本不兼容的接口,…...

OpenCV 寻找棋盘格角点及绘制
目录 一、概念 二、代码 2.1实现步骤 2.2完整代码 三、实现效果 一、概念 寻找棋盘格角点(Checkerboard Corners)是计算机视觉中相机标定(Camera Calibration)过程的重要步骤。 OpenCV 提供了函数 cv2.findChessboardCorners…...

【深度学习】PyTorch深度学习笔记02-线性模型
1. 监督学习 2. 数据集的划分 3. 平均平方误差MSE 4. 线性模型Linear Model - y x * w 用穷举法确定线性模型的参数 import numpy as np import matplotlib.pyplot as pltx_data [1.0, 2.0, 3.0] y_data [2.0, 4.0, 6.0]def forward(x):return x * wdef loss(x, y):y_pred…...

10.FreeRTOS_互斥量
互斥量概述 在博文“ FreeRTOS_信号量 ”中,使用了二进制信号量实现了互斥,保护了串口资源。博文链接如下: FreeRTOS_信号量-CSDN博客 但还是要引入互斥量的概念。互斥量与二进制信号量相比,能够多实现如下两个功能:…...

EtherCAT总线冗余让制造更安全更可靠更智能
冗余定义 什么是总线冗余功能?我们都知道,EtherCAT现场总线具有灵活的拓扑结构,设备间支持线型、星型、树型的连接方式,其中线型结构简单、传输效率高,大多数的现场应用中也是使用这种连接方式,如下图所示…...

Android IdleHandler源码分析
文章目录 Android IdleHandler源码分析概述前提基本用法源码分析添加和删除任务执行任务 应用场景 Android IdleHandler源码分析 概述 IdleHandler是一个接口,它定义在MessageQueue类中,用于在主线程的消息队列空闲时执行一些轻量级的任务。IdleHandle…...

Mac安装stable diffusion 工具
文章目录 1.安装 Homebrew2.安装 stable diffusion webui 的依赖3.下载 stable diffusion webui 代码4.启动 stable diffusion webui 本体5.下载模型6.这里可能会遇到一个clip-vit-large-patch14报错 参考:https://brew.idayer.com/install/stable-diffusion-webui/…...

CVE-2024-6387Open SSH漏洞彻底解决举措(含踩坑内容)
一、漏洞名称 OpenSSH 远程代码执行漏洞(CVE-2024-6387) 二、漏洞概述 Open SSH是基于SSH协议的安全网络通信工具,广泛应用于远程服务器管理、加密文件传输、端口转发、远程控制等多个领域。近日被爆出存在一个远程代码执行漏洞,由于Open SSH服务器端…...

python的简单爬取
需要的第三方模块 requests winr打开命令行输入cmd 简单爬取的基本格式(爬取百度logo为例) import requests url"http://www.baidu.com/img/PCtm_d9c8750bed0b3c7d089fa7d55720d6cf.png" resprequests.get(url)#回应 #保存到本地 with open(&…...

【WEB前端2024】3D智体编程:乔布斯3D纪念馆-第60集-agent训练资讯APP重点推荐AI资讯内容(含视频)
【WEB前端2024】3D智体编程:乔布斯3D纪念馆-第60集-agent训练资讯APP重点推荐AI资讯内容(含视频) 使用dtns.network德塔世界(开源的智体世界引擎),策划和设计《乔布斯超大型的开源3D纪念馆》的系列教程。d…...

【学术会议征稿】第三届智能电网与能源系统国际学术会议
第三届智能电网与能源系统国际学术会议 2024 3rd International Conference on Smart Grid and Energy Systems 第三届智能电网与能源系统国际学术会议(SGES 2024)将于2024年10月25日-27日在郑州召开。 智能电网可以优化能源布局,让现有能源…...