当前位置: 首页 > news >正文

最强分布式锁工具:Redisson

1 Redisson概述

1.1 什么是Redisson?

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。

其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。

Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

一个基于Redis实现的分布式工具,有基本分布式对象和高级又抽象的分布式服务,为每个试图再造分布式轮子的程序员带来了大部分分布式问题的解决办法。

Redisson和Jedis、Lettuce有什么区别?倒也不是雷锋和雷锋塔

Redisson和它俩的区别就像一个用鼠标操作图形化界面,一个用命令行操作文件。Redisson是更高层的抽象,Jedis和Lettuce是Redis命令的封装。

  • Jedis是Redis官方推出的用于通过Java连接Redis客户端的一个工具包,提供了Redis的各种命令支持

  • Lettuce是一种可扩展的线程安全的 Redis 客户端,通讯框架基于Netty,支持高级的 Redis 特性,比如哨兵,集群,管道,自动重新连接和Redis数据模型。Spring Boot 2.x 开始 Lettuce 已取代 Jedis 成为首选 Redis 的客户端。

  • Redisson是架设在Redis基础上,通讯基于Netty的综合的、新型的中间件,企业级开发中使用Redis的最佳范本

Jedis把Redis命令封装好,Lettuce则进一步有了更丰富的Api,也支持集群等模式。但是两者也都点到为止,只给了你操作Redis数据库的脚手架,而Redisson则是基于Redis、Lua和Netty建立起了成熟的分布式解决方案,甚至redis官方都推荐的一种工具集。

2 分布式锁

2.1 分布式锁怎么实现?

分布式锁是并发业务下的刚需,虽然实现五花八门:ZooKeeper有Znode顺序节点,数据库有表级锁和乐/悲观锁,Redis有setNx,但是殊途同归,最终还是要回到互斥上来,本篇介绍Redisson,那就以redis为例。

2.2 怎么写一个简单的Redis分布式锁?

以Spring Data Redis为例,用RedisTemplate来操作Redis(setIfAbsent已经是setNx + expire的合并命令),如下:

// 加锁
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}
// 解锁,防止删错别人的锁,以uuid为value校验是否自己的锁
public void unlock(String lockName, String uuid) {if(uuid.equals(redisTemplate.opsForValue().get(lockName)){        redisTemplate.opsForValue().del(lockName);    }
}// 结构
if(tryLock){// todo
}finally{unlock;
}

简单1.0版本完成,聪明的小张一眼看出,这是锁没错,但get和del操作非原子性,并发一旦大了,无法保证进程安全。于是小张提议,用Lua脚本

2.3 Lua脚本是什么?

Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过redis的eval/evalsha命令来运行,把操作封装成一个Lua脚本,如论如何都是一次执行的原子操作。

于是2.0版本通过Lua脚本删除

lockDel.lua如下:

if redis.call('get', KEYS[1]) == ARGV[1] then -- 执行删除操作return redis.call('del', KEYS[1]) else -- 不成功,返回0return 0 
end

delete操作时执行Lua命令:

// 解锁脚本
DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua")));// 执行lua脚本解锁
redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);

2.0似乎更像一把锁,但好像又缺少了什么,小张一拍脑袋,synchronized和ReentrantLock都很丝滑,因为他们都是可重入锁,一个线程多次拿锁也不会死锁,我们需要可重入。

2.4 怎么保证可重入?

重入就是,同一个线程多次获取同一把锁是允许的,不会造成死锁,这一点synchronized偏向锁提供了很好的思路,synchronized的实现重入是在JVM层面,JAVA对象头MARK WORD中便藏有线程ID和计数器来对当前线程做重入判断,避免每次CAS。

当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储偏向的线程ID,以后该线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需简单测试一下对象头的Mark Word里是否存储着指向当前线程的偏向锁。如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下Mark Word中偏向锁标志是否设置成1:没有则CAS竞争;设置了,则CAS将对象头偏向锁指向当前线程。

再维护一个计数器,同个线程进入则自增1,离开再减1,直到为0才能释放

2.5 可重入锁

仿造该方案,我们需改造Lua脚本:

1.需要存储 锁名称lockName、获得该锁的线程id和对应线程的进入次数count

2.加锁

每次线程获取锁时,判断是否已存在该锁

  • 不存在

  • 设置hash的key为线程id,value初始化为1

  • 设置过期时间

  • 返回获取锁成功true

  • 存在

  • 继续判断是否存在当前线程id的hash key

  • 存在,线程key的value + 1,重入次数增加1,设置过期时间

  • 不存在,返回加锁失败

3.解锁

每次线程来解锁时,判断是否已存在该锁

  • 存在

  • 是否有该线程的id的hash key,有则减1,无则返回解锁失败

  • 减1后,判断剩余count是否为0,为0则说明不再需要这把锁,执行del命令删除

2.5.1 存储结构

为了方便维护这个对象,我们用Hash结构来存储这些字段。Redis的Hash类似Java的HashMap,适合存储对象。

hset lockname1 threadId 1

设置一个名字为lockname1的hash结构,该hash结构key为threadId,值value为1

hget lockname1 threadId

获取lockname1的threadId的值

存储结构为:

lockname 锁名称key1:   threadId   唯一键,线程idvalue1:  count     计数器,记录该线程获取锁的次数

redis中的结构

2.5.2 计数器的加减

当同一个线程获取同一把锁时,我们需要对对应线程的计数器count做加减

判断一个redis key是否存在,可以用exists,而判断一个hash的key是否存在,可以用hexists

而redis也有hash自增的命令hincrby

每次自增1时 hincrby lockname1 threadId 1,自减1时 hincrby lockname1 threadId -1

2.5.3 解锁的判断

当一把锁不再被需要了,每次解锁一次,count减1,直到为0时,执行删除

综合上述的存储结构和判断流程,加锁和解锁Lua如下

加锁 lock.lua:

local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];-- lockname不存在
if(redis.call('exists', key) == 0) thenredis.call('hset', key, threadId, '1');redis.call('expire', key, releaseTime);return 1;
end;-- 当前线程已id存在
if(redis.call('hexists', key, threadId) == 1) thenredis.call('hincrby', key, threadId, '1');redis.call('expire', key, releaseTime);return 1;
end;
return 0;

解锁 unlock.lua:

local key = KEYS[1];
local threadId = ARGV[1];-- lockname、threadId不存在
if (redis.call('hexists', key, threadId) == 0) thenreturn nil;
end;-- 计数器-1
local count = redis.call('hincrby', key, threadId, -1);-- 删除lock
if (count == 0) thenredis.call('del', key);return nil;
end;

代码:

/*** @description 原生redis实现分布式锁**/
@Getter
@Setter
public class RedisLock {private RedisTemplate redisTemplate;private DefaultRedisScript<Long> lockScript;private DefaultRedisScript<Object> unlockScript;public RedisLock(RedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// 加载加锁的脚本lockScript = new DefaultRedisScript<>();this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua")));this.lockScript.setResultType(Long.class);// 加载释放锁的脚本unlockScript = new DefaultRedisScript<>();this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));}/*** 获取锁*/public String tryLock(String lockName, long releaseTime) {// 存入的线程信息的前缀String key = UUID.randomUUID().toString();// 执行脚本Long result = (Long) redisTemplate.execute(lockScript,Collections.singletonList(lockName),key + Thread.currentThread().getId(),releaseTime);if (result != null && result.intValue() == 1) {return key;} else {return null;}}/*** 解锁* @param lockName* @param key*/public void unlock(String lockName, String key) {redisTemplate.execute(unlockScript,Collections.singletonList(lockName),key + Thread.currentThread().getId());}
}

至此已经完成了一把分布式锁,符合互斥、可重入、防死锁的基本特点。

严谨的小张觉得虽然当个普通互斥锁,已经稳稳够用,可是业务里总是又很多特殊情况的,比如A进程在获取到锁的时候,因业务操作时间太长,锁释放了但是业务还在执行,而此刻B进程又可以正常拿到锁做业务操作,两个进程操作就会存在依旧有共享资源的问题

而且如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态

小张不是杠精,因为库存操作总有这样那样的特殊。

所以我们希望在这种情况时,可以延长锁的releaseTime延迟释放锁来直到完成业务期望结果,这种不断延长锁过期时间来保证业务执行完成的操作就是锁续约。

读写分离也是常见,一个读多写少的业务为了性能,常常是有读锁和写锁的。

而此刻的扩展已经超出了一把简单轮子的复杂程度,光是处理续约,就够小张喝一壶,何况在性能(锁的最大等待时间)、优雅(无效锁申请)、重试(失败重试机制)等方面还要下功夫研究。

在小张苦思冥想时,旁边的小白凑过来看了看小张,很好奇,都2021年了,为什么不直接用redisson呢?

Redisson就有这把你要的锁。

3 Redisson分布式锁

号称简单的Redisson分布式锁的使用姿势是什么?

3.1 依赖

<!-- 原生,本章使用-->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency><!-- 另一种Spring集成starter,本章未使用 -->
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.13.6</version>
</dependency>

3.2 配置

@Configuration
public class RedissionConfig {@Value("${spring.redis.host}")private String redisHost;@Value("${spring.redis.password}")private String password;private int port = 6379;@Beanpublic RedissonClient getRedisson() {Config config = new Config();config.useSingleServer().setAddress("redis://" + redisHost + ":" + port).setPassword(password);config.setCodec(new JsonJacksonCodec());return Redisson.create(config);}
}

3.3 启用分布式锁

@Resource
private RedissonClient redissonClient;RLock rLock = redissonClient.getLock(lockName);
try {boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);if (isLocked) {// TODO}} catch (Exception e) {rLock.unlock();}

简洁明了,只需要一个RLock,既然推荐Redisson,就往里面看看他是怎么实现的。

4 RLock

RLock是Redisson分布式锁的最核心接口,继承了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的返回值都是RFuture,是Redisson执行异步实现的核心逻辑,也是Netty发挥的主要阵地。

RLock如何加锁?

从RLock进入,找到RedissonLock类,找到tryLock方法再递进到干事的tryAcquireOnceAsync方法,这是加锁的主要代码(版本不一此处实现有差别,和最新3.15.x有一定出入,但是核心逻辑依然未变。此处以3.13.6为例)

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}}

此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog锁续约(下文),一个注册了加锁事件的续约任务。我们先来看有过期时间tryLockInnerAsync部分,

evalWriteAsync是eval命令执行lua的入口

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime);return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});}

这里揭开真面目,eval命令执行Lua脚本的地方,此处的Lua脚本展开

-- 不存在该key时
if (redis.call('exists', KEYS[1]) == 0) then -- 新增该锁并且hash中该线程id对应的count置1redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 设置过期时间redis.call('pexpire', KEYS[1], ARGV[1]); return nil; 
end; -- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 线程重入次数++redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; 
end; 
return redis.call('pttl', KEYS[1]);

和前面我们写自定义的分布式锁的脚本几乎一致,看来redisson也是一样的实现,具体参数分析:

// keyName
KEYS[1] = Collections.singletonList(this.getName())
// leaseTime
ARGV[1] = this.internalLockLeaseTime
// uuid+threadId组合的唯一值
ARGV[2] = this.getLockName(threadId)

总共3个参数完成了一段逻辑:

判断该锁是否已经有对应hash表存在,

• 没有对应的hash表:则set该hash表中一个entry的key为锁名称,value为1,之后设置该hash表失效时间为leaseTime

• 存在对应的hash表:则将该lockName的value执行+1操作,也就是计算进入次数,再设置失效时间leaseTime

• 最后返回这把锁的ttl剩余时间

也和上述自定义锁没有区别

既然如此,那解锁的步骤也肯定有对应的-1操作,再看unlock方法,同样查找方法名,一路到

protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});}

掏出Lua部分

-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;
end;
-- 计数器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then -- 过期时间重设redis.call('pexpire', KEYS[1], ARGV[2]); return 0; 
else-- 删除并发布解锁消息redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1;
end; 
return nil;

该Lua KEYS有2个Arrays.asList(getName(), getChannelName())

name 锁名称
channelName,用于pubSub发布消息的channel名称

ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)

LockPubSub.UNLOCK_MESSAGE,channel发送消息的类别,此处解锁为0

internalLockLeaseTime,watchDog配置的超时时间,默认为30s

lockName 这里的lockName指的是uuid和threadId组合的唯一值

步骤如下:

1.如果该锁不存在则返回nil;

2.如果该锁存在则将其线程的hash key计数器-1,

3.计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;

其中unLock的时候使用到了Redis发布订阅PubSub完成消息通知。

而订阅的步骤就在RedissonLock的加锁入口的lock方法里

long threadId = Thread.currentThread().getId();Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);if (ttl != null) {// 订阅RFuture<RedissonLockEntry> future = this.subscribe(threadId);if (interruptibly) {this.commandExecutor.syncSubscriptionInterrupted(future);} else {this.commandExecutor.syncSubscription(future);}// 省略

当锁被其他线程占用时,通过监听锁的释放通知(在其他线程通过RedissonLock释放锁时,会通过发布订阅pub/sub功能发起通知),等待锁被其他线程释放,也是为了避免自旋的一种常用效率手段。

4.1 解锁消息

为了一探究竟通知了什么,通知后又做了什么,进入LockPubSub。

这里只有一个明显的监听方法onMessage,其订阅和信号量的释放都在父类PublishSubscribe,我们只关注监听事件的实际操作

protected void onMessage(RedissonLockEntry value, Long message) {Runnable runnableToExecute;if (message.equals(unlockMessage)) {// 从监听器队列取监听线程执行监听回调runnableToExecute = (Runnable)value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}// getLatch()返回的是Semaphore,信号量,此处是释放信号量// 释放信号量后会唤醒等待的entry.getLatch().tryAcquire去再次尝试申请锁value.getLatch().release();} else if (message.equals(readUnlockMessage)) {while(true) {runnableToExecute = (Runnable)value.getListeners().poll();if (runnableToExecute == null) {value.getLatch().release(value.getLatch().getQueueLength());break;}runnableToExecute.run();}}}

发现一个是默认解锁消息,一个是读锁解锁消息,因为redisson是有提供读写锁的,而读写锁读读情况和读写、写写情况互斥情况不同,我们只看上面的默认解锁消息unlockMessage分支

LockPubSub监听最终执行了2件事

  1. runnableToExecute.run() 执行监听回调

  1. value.getLatch().release(); 释放信号量

Redisson通过LockPubSub监听解锁消息,执行监听回调和释放信号量通知等待线程可以重新抢锁。

这时再回来看tryAcquireOnceAsync另一分支

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}}

可以看到,无超时时间时,在执行加锁操作后,还执行了一段费解的逻辑

ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);}}})                   }                 }             }) 

此处涉及到Netty的Future/Promise-Listener模型,Redisson中几乎全部以这种方式通信(所以说Redisson是基于Netty通信机制实现的),理解这段逻辑可以试着先理解

在 Java 的 Future 中,业务逻辑为一个 Callable 或 Runnable 实现类,该类的 call()或 run()执行完毕意味着业务逻辑的完结,在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败,这样更加方便的监控自己的业务逻辑。

这块代码的表面意义就是,在执行异步加锁的操作后,加锁成功则根据加锁完成返回的ttl是否过期来确认是否执行一段定时任务。

这段定时任务的就是watchDog的核心。

4.2 锁续约

查看RedissonLock.this.scheduleExpirationRenewal(threadId)

private void scheduleExpirationRenewal(long threadId) {RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);this.renewExpiration();}}private void renewExpiration() {RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (ee != null) {Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());if (ent != null) {Long threadId = ent.getFirstThreadId();if (threadId != null) {RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);} else {if (res) {RedissonLock.this.renewExpiration();}}});}}}}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);ee.setTimeout(task);}}

拆分来看,这段连续嵌套且冗长的代码实际上做了几步

• 添加一个netty的Timeout回调任务,每(internalLockLeaseTime / 3)毫秒执行一次,执行的方法是renewExpirationAsync

• renewExpirationAsync重置了锁超时时间,又注册一个监听器,监听回调又执行了renewExpiration

renewExpirationAsync 的Lua如下

protected RFuture<Boolean> renewExpirationAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});}if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; 
end; 
return 0;

重新设置了超时时间。

Redisson加这段逻辑的目的是什么?

目的是为了某种场景下保证业务不影响,如任务执行超时但未结束,锁已经释放的问题。

当一个线程持有了一把锁,由于并未设置超时时间leaseTime,Redisson默认配置了30S,开启watchDog,每10S对该锁进行一次续约,维持30S的超时时间,直到任务完成再删除锁。

这就是Redisson的锁续约,也就是WatchDog实现的基本思路。

4.3 流程概括

通过整体的介绍,流程简单概括:

  1. A、B线程争抢一把锁,A获取到后,B阻塞

  1. B线程阻塞时并非主动CAS,而是PubSub方式订阅该锁的广播消息

  1. A操作完成释放了锁,B线程收到订阅消息通知

  1. B被唤醒开始继续抢锁,拿到锁

详细加锁解锁流程总结如下图:

5 公平锁

以上介绍的可重入锁是非公平锁,Redisson还基于Redis的队列(List)和ZSet实现了公平

5.1 公平的定义是什么?

公平就是按照客户端的请求先来后到排队来获取锁,先到先得,也就是FIFO,所以队列和容器顺序编排必不可少

5.2 FairSync

回顾JUC的ReentrantLock公平锁的实现

/*** Sync object for fair locks*/
static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire.  Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
}

AQS已经提供了整个实现,是否公平取决于实现类取出节点逻辑是否顺序取

AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架,通过内置FIFO队列来完成资源获取线程的排队工作,他自身没有实现同步接口,仅仅定义了若干同步状态获取和释放的方法来供自定义同步组件使用(上图),支持独占和共享获取,这是基于模版方法模式的一种设计,给公平/非公平提供了土壤。

我们用2张图来简单解释AQS的等待流程(出自《JAVA并发编程的艺术》)

一张是同步队列(FIFO双向队列)管理 获取同步状态失败(抢锁失败)的线程引用、等待状态和前驱后继节点的流程图

一张是独占式获取同步状态的总流程,核心acquire(int arg)方法调用流程

可以看出锁的获取流程

AQS维护一个同步队列,获取状态失败的线程都会加入到队列中进行自旋,移出队列或停止自旋的条件是前驱节点为头节点切成功获取了同步状态。

而比较另一段非公平锁类NonfairSync可以发现,控制公平和非公平的关键代码,在于hasQueuedPredecessors方法。

static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock.  Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
}

NonfairSync减少了了hasQueuedPredecessors判断条件,该方法的作用就是

查看同步队列中当前节点是否有前驱节点,如果有比当前线程更早请求获取锁则返回true。

保证每次都取队列的第一个节点(线程)来获取锁,这就是公平规则

5.2.1 为什么JUC以默认非公平锁呢?

因为当一个线程请求锁时,只要获取来同步状态即成功获取。在此前提下,刚释放的线程再次获取同步状态的几率会非常大,使得其他线程只能在同步队列中等待。但这样带来的好处是,非公平锁大大减少了系统线程上下文的切换开销。

可见公平的代价是性能与吞吐量。

Redis里没有AQS,但是有List和zSet,看看Redisson是怎么实现公平的。

5.3 RedissonFairLock

RedissonFairLock 用法依然很简单

RLock fairLock = redissonClient.getFairLock(lockName);
fairLock.lock();

RedissonFairLock继承自RedissonLock,同样一路向下找到加锁实现方法tryLockInnerAsync

这里有2段冗长的Lua,但是Debug发现,公平锁的入口在 command == RedisCommands.EVAL_LONG 之后,此段Lua较长,参数也多,我们着重分析Lua的实现规则

参数:

-- lua中的几个参数
KEYS = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)
KEYS[1]: lock_name, 锁名称                   
KEYS[2]: "redisson_lock_queue:{xxx}"  线程队列
KEYS[3]: "redisson_lock_timeout:{xxx}"  线程id对应的超时集合ARGV =  internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime
ARGV[1]: "{leaseTime}" 过期时间
ARGV[2]: "{Redisson.UUID}:{threadId}"   
ARGV[3] = 当前时间 + 线程等待时间:(10:00:00) + 5000毫秒 = 10:00:05
ARGV[4] = 当前时间(10:00:00)  部署服务器时间,非redis-server服务器时间

公平锁实现的Lua脚本

-- 1.死循环清除过期key
while true do -- 获取头节点local firstThreadId2 = redis.call('lindex', KEYS[2], 0);-- 首次获取必空跳出循环if firstThreadId2 == false then break;end;-- 清除过期keylocal timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));if timeout <= tonumber(ARGV[4]) thenredis.call('zrem', KEYS[3], firstThreadId2);redis.call('lpop', KEYS[2]);elsebreak;end;
end;-- 2.不存在该锁 && (不存在线程等待队列 || 存在线程等待队列而且第一个节点就是此线程ID),加锁部分主要逻辑
if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0)  or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then-- 弹出队列中线程id元素,删除Zset中该线程id对应的元素redis.call('lpop', KEYS[2]);redis.call('zrem', KEYS[3], ARGV[2]);local keys = redis.call('zrange', KEYS[3], 0, -1);-- 遍历zSet所有key,将key的超时时间(score) - 当前时间msfor i = 1, #keys, 1 do redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);end;-- 加锁设置锁过期时间redis.call('hset', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;-- 3.线程存在,重入判断
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 thenredis.call('hincrby', KEYS[1], ARGV[2],1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;-- 4.返回当前线程剩余存活时间
local timeout = redis.call('zscore', KEYS[3], ARGV[2]);if timeout ~= false then-- 过期时间timeout的值在下方设置,此处的减法算出的依旧是当前线程的ttlreturn timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);
end;-- 5.尾节点剩余存活时间
local lastThreadId = redis.call('lindex', KEYS[2], -1);
local ttl;
-- 尾节点不空 && 尾节点非当前线程
if lastThreadId ~= false and lastThreadId ~= ARGV[2] then-- 计算队尾节点剩余存活时间ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);
else-- 获取lock_name剩余存活时间ttl = redis.call('pttl', KEYS[1]);
end;-- 6.末尾排队
-- zSet 超时时间(score),尾节点ttl + 当前时间 + 5000ms + 当前时间,无则新增,有则更新
-- 线程id放入队列尾部排队,无则插入,有则不再插入
local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);
if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 thenredis.call('rpush', KEYS[2], ARGV[2]);
end;
return ttl;

5.4 公平锁加锁步骤

通过以上Lua,可以发现,lua操作的关键结构是列表(list)和有序集合(zSet)。

其中list维护了一个等待的线程队列redisson_lock_queue:{xxx},zSet维护了一个线程超时情况的有序集合redisson_lock_timeout:{xxx},尽管lua较长,但是可以拆分为6个步骤

1.队列清理

  • 保证队列中只有未过期的等待线程

2.首次加锁

  • hset加锁,pexpire过期时间

3.重入判断

  • 此处同可重入锁lua

4.返回ttl

5.计算尾节点ttl

  • 初始值为锁的剩余过期时间

6.末尾排队

  • ttl + 2 * currentTime + waitTime是score的默认值计算公式

2.模拟

如果模拟以下顺序,就会明了redisson公平锁整个加锁流程

假设 t1 10:00:00 < t2 10:00:10 < t3 10:00:20

t1:当线程1初次获取锁

1.等待队列无头节点,跳出死循环->2
2.不存在该锁 && 不存在线程等待队列 成立
2.1 lpop和zerm、zincrby都是无效操作,只有加锁生效,说明是首次加锁,加锁后返回nil
加锁成功,线程1获取到锁,结束

t2:线程2尝试获取锁(线程1未释放锁)

1.等待队列无头节点,跳出死循环->2
2.不存在该锁 不成立->3
3.非重入线程 ->4
4.score无值 ->5
5.尾节点为空,设置ttl初始值为lock_name的ttl -> 6
6.按照ttl + waitTime + currentTime + currentTime 来设置zSet超时时间score,并且加入等待队列,线程2为头节点
score = 20S + 5000ms + 10:00:10 + 10:00:10 = 10:00:35 + 10:00:10

t3:线程3尝试获取锁(线程1未释放锁)

1.等待队列有头节点
1.1未过期->2
2.不存在该锁不成立->3
3.非重入线程->4
4.score无值 ->5
5.尾节点不为空 && 尾节点线程为2,非当前线程
5.1取出之前设置的score,减去当前时间:ttl = score - currentTime ->6
6.按照ttl + waitTime + currentTime + currentTime 来设置zSet超时时间score,并且加入等待队列
score = 10S + 5000ms + 10:00:20 + 10:00:20 = 10:00:35 + 10:00:20

如此一来,三个需要抢夺一把锁的线程,完成了一次排队,在list中排列他们等待线程id,在zSet中存放过期时间(便于排列优先级)。其中返回ttl的线程2客户端、线程3客户端将会一直按一定间隔自旋重复执行该段Lua,尝试加锁,如此一来便和AQS有了异曲同工之处。

而当线程1释放锁之后(这里依旧有通过Pub/Sub发布解锁消息,通知其他线程获取)

10:00:30 线程2尝试获取锁(线程1已释放锁)

1.等待队列有头节点,未过期->2
2.不存在该锁 & 等待队列头节点是当前线程 成立
2.1删除当前线程的队列信息和zSet信息,超时时间为:
线程2 10:00:35 + 10:00:10 - 10:00:30 = 10:00:15
线程3 10:00:35 + 10:00:20 - 10:00:30 = 10:00:25
2.2线程2获取到锁,重新设置过期时间
加锁成功,线程2获取到锁,结束

排队结构如图:

公平锁的释放脚本和重入锁类似,多了一步加锁开头的清理过期key的while true逻辑,在此不再展开篇幅描述。

由上可以看出,Redisson公平锁的玩法类似于延迟队列的玩法,核心都在Redis的List和zSet结构的搭配,但又借鉴了AQS实现,在定时判断头节点上如出一辙(watchDog),保证了锁的竞争公平和互斥。并发场景下,lua脚本里,zSet的score很好地解决了顺序插入的问题,排列好优先级。

并且为了防止因异常而退出的线程无法清理,每次请求都会判断头节点的过期情况给予清理,最后释放时通过CHANNEL通知订阅线程可以来获取锁,重复一开始的步骤,顺利交接到下一个顺序线程。

6 总结

Redisson整体实现分布式加解锁流程的实现稍显复杂,作者Rui Gu对Netty和JUC、Redis研究深入,利用了很多高级特性和语义,值得深入学习,本次介绍也只是单机Redis下锁实现。

Redisson也提供了多机情况下的联锁MultiLock:

https://github.com/redisson/redisson/wiki/8.-分布式锁和同步器#81-可重入锁reentrant-lock

和官方推荐的红锁RedLock:

https://github.com/redisson/redisson/wiki/8.-分布式锁和同步器#84-红锁redlock

所以,当你真的需要分布式锁时,不妨先来Redisson里找找。

相关文章:

最强分布式锁工具:Redisson

1 Redisson概述1.1 什么是Redisson&#xff1f;Redisson是一个在Redis的基础上实现的Java驻内存数据网格&#xff08;In-Memory Data Grid&#xff09;。它不仅提供了一系列的分布式的Java常用对象&#xff0c;还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, Sorted…...

Java9-17新特性

Java9-17新特性 一、接口的私有方法 Java8版本接口增加了两类成员&#xff1a; 公共的默认方法公共的静态方法 Java9版本接口又新增了一类成员&#xff1a; 私有的方法 为什么JDK1.9要允许接口定义私有方法呢&#xff1f;因为我们说接口是规范&#xff0c;规范时需要公开…...

电脑开机找不到启动设备怎么办?

电脑正常开机&#xff0c;却提示“找不到启动设备”&#xff0c;这时我们该怎么办呢&#xff1f;本文就为大家介绍几种针对该问题的解决方法&#xff0c;一起来看看吧&#xff01;“找不到启动设备”是什么意思&#xff1f;可引导设备&#xff08;又称启动设备&#xff09;是一…...

使用langchain打造自己的大型语言模型(LLMs)

我们知道Openai的聊天机器人可以回答用户提出的绝大多数问题,它几乎无所不知&#xff0c;无所不能&#xff0c;但是由于有机器人所学习到的是截止到2021年9月以前的知识&#xff0c;所以当用户询问机器人关于2021年9月以后发送的事情时&#xff0c;它无法给出正确的答案&#x…...

assert()宏函数

assert()宏函数 assert是宏&#xff0c;而不是函数。在C的assert.h文件中 #include <assert.h> void assert( int expression );assert的作用是先计算表达式expression&#xff0c; 如果其值为假&#xff08;即为0&#xff09;&#xff0c;那么它会打印出来assert的内容…...

Docker圣经:大白话说Docker底层原理,6W字实现Docker自由

说在前面&#xff1a; 现在拿到offer超级难&#xff0c;甚至连面试电话&#xff0c;一个都搞不到。 尼恩的技术社群&#xff08;50&#xff09;中&#xff0c;很多小伙伴凭借 “左手云原生右手大数据”的绝活&#xff0c;拿到了offer&#xff0c;并且是非常优质的offer&#…...

Redis+Caffeine多级(二级)缓存,让访问速度纵享丝滑

目录多级缓存的引入多级缓存的优势CaffeineRedis实现多级缓存V1.0版本V2.0版本V3.0版本多级缓存的引入 在高性能的服务架构设计中&#xff0c;缓存是一个不可或缺的环节。在实际的项目中&#xff0c;我们通常会将一些热点数据存储到Redis或MemCache这类缓存中间件中&#xff0…...

C#和.net框架之第一弹

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录C# 简介一、微软平台的编程二、使用VS创建第一个c#程序1、第一步2、第二步3、第三步4、第四步5、第五步C# 简介 C# 是一个现代的、通用的、面向对象的编程语言&…...

C++---背包模型---潜水员(每日一道算法2023.3.12)

注意事项&#xff1a; 本题是"动态规划—01背包"和"背包模型—二维费用的背包问题"的扩展题&#xff0c;优化思路不多赘述&#xff0c;dp思路会稍有不同&#xff0c;下面详细讲解。 题目&#xff1a; 潜水员为了潜水要使用特殊的装备。 他有一个带2种气体…...

C++类的成员变量和成员函数详解

类可以看做是一种数据类型,它类似于普通的数据类型,但是又有别于普通的数据类型。类这种数据类型是一个包含成员变量和成员函数的集合。 类的成员变量和普通变量一样,也有数据类型和名称,占用固定长度的内存。但是,在定义类的时候不能对成员变量赋值,因为类只是一种数据类…...

(枚举)(模拟)(位运算)116. 飞行员兄弟

目录 题目链接 一些话 切入点 流程 套路 ac代码 题目链接 116. 飞行员兄弟 - AcWing题库 我草&#xff0c;又~在&#xff5e;水&#xff5e;字&#xff5e;数&#xff5e;啦&#xff01;我草&#xff0c;又~在&#xff5e;水&#xff5e;字&#xff5e;数&#xff5e;啦…...

详解Array.prototype.shift.call(arguments)

经常看到如下代码&#xff1a; function foo() {let k Array.prototype.shift.call(arguments);console.log(k) } foo(11,22) //11 Array.prototype.shift.call(arguments)的作用是&#xff1a; 取 arguments 中的第一个参数 一、为啥要这么写&#xff0c;不直接使用argume…...

Tina_Linux_Wi-Fi_开发指南

Tina Linux Wi-Fi 开发指南 1 前言 1.1 文档简介 介绍Allwinner 平台上Wi-Fi 驱动移植&#xff0c;介绍Tina Wi-Fi 管理框架&#xff0c;包括Station&#xff0c;Ap 以及Wi-Fi 常见问题。 1.2 目标读者 适用Tina 平台的广大客户和对Tina Wi-Fi 感兴趣的同事。 1.3 适用范…...

Spring AOP(AOP概念、组成、Spring AOP实现及实现原理)

文章目录1. Spring AOP 是什么2. 为什么要用 AOP3. 怎么学 Spring AOP4. AOP 组成5. Spring AOP 实现5.1 添加 Spring AOP 框架支持5.2 定义切面和切点5.3 实现通知方法5.4 使⽤ AOP 统计 UserController 每个⽅法的执⾏时间 StopWatch5.4 切点表达式说明 AspectJ6. Spring AOP…...

8.条件渲染指令

目录 1 v-if v-show 2 v-if v-else-if v-else 1 v-if v-show v-if与v-show都可以控制DOM的显示与隐藏 由于flag是布尔值&#xff0c;所以这里可以直接写 v-if"flag" 当flag为true的时候&#xff0c;v-if与v-show控制的div都会被显示出来 当flag为false的时候&a…...

2023年全网最全最细最流行的自动化测试工具有哪些?你都知道吗!

下面就是我个人整理的一些比较常用的自动化测试工具&#xff0c;并且还有视频版本的详细介绍&#xff0c;同时在线学习人数超过1000人&#xff01; B站讲的最详细的Python接口自动化测试实战教程全集&#xff08;实战最新版&#xff09;一&#xff1a;前言 随着测试工程师技能和…...

网络安全——数据链路层安全协议

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​​ 目录 前言 一.数据链路层安全协议简介 1.数据链路安全性 二.局域网数据链路层协议 1.…...

编译原理基础概念

一、什么是编译程序编译程序是一种程序&#xff0c;能够将某一种高级语言编写的源程序改造成另一种低级语言编写的目标程序&#xff0c;他们在逻辑上等价、完成相同的工作二、编译阶段1、当目标程序是机器语言时&#xff0c;编译阶段&#xff1a;&#xff08;1&#xff09;编译…...

蔬菜视觉分拣机器人的设计与实现(RoboWork参赛方案)

蔬菜视觉分拣机器人的设计与实现 文章目录蔬菜视觉分拣机器人的设计与实现1. 技术栈背景2. 整体设计3. 机械结构3.1 整体结构3.2 底座结构3.3 小臂结构3.4 大臂结构3.5 负载组件结构3.6 末端执行器结构4. 硬件部分4.1 视觉系统4.1.1 光源4.1.2 海康工业相机4.2 传送带系统4.2.1…...

【LVGL移植】STM32F1基于STM32CubeMX配置硬件SPI驱动1.8寸TFT ST7735S跑LVGL图形demo

【LVGL移植】STM32F1基于STM32CubeMX配置硬件SPI驱动1.8寸TFT ST7735S屏幕跑LVGL图形demo&#x1f3ac;运行LVGL 按键组件demo ✨基于STM32CubeMX配置工程是因为方便移植&#xff0c;只要是STM32芯片&#xff0c;拿到我的这个工程源码就可以根据自己的stm32芯片&#xff0c;自…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

C++中string流知识详解和示例

一、概览与类体系 C 提供三种基于内存字符串的流&#xff0c;定义在 <sstream> 中&#xff1a; std::istringstream&#xff1a;输入流&#xff0c;从已有字符串中读取并解析。std::ostringstream&#xff1a;输出流&#xff0c;向内部缓冲区写入内容&#xff0c;最终取…...

相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...

Redis数据倾斜问题解决

Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中&#xff0c;部分节点存储的数据量或访问量远高于其他节点&#xff0c;导致这些节点负载过高&#xff0c;影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档&#xff09;&#xff0c;如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下&#xff0c;风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

CSS | transition 和 transform的用处和区别

省流总结&#xff1a; transform用于变换/变形&#xff0c;transition是动画控制器 transform 用来对元素进行变形&#xff0c;常见的操作如下&#xff0c;它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...

MinIO Docker 部署:仅开放一个端口

MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...

Vite中定义@软链接

在webpack中可以直接通过符号表示src路径&#xff0c;但是vite中默认不可以。 如何实现&#xff1a; vite中提供了resolve.alias&#xff1a;通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...

如何应对敏捷转型中的团队阻力

应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中&#xff0c;明确沟通敏捷转型目的尤为关键&#xff0c;团队成员只有清晰理解转型背后的原因和利益&#xff0c;才能降低对变化的…...

水泥厂自动化升级利器:Devicenet转Modbus rtu协议转换网关

在水泥厂的生产流程中&#xff0c;工业自动化网关起着至关重要的作用&#xff0c;尤其是JH-DVN-RTU疆鸿智能Devicenet转Modbus rtu协议转换网关&#xff0c;为水泥厂实现高效生产与精准控制提供了有力支持。 水泥厂设备众多&#xff0c;其中不少设备采用Devicenet协议。Devicen…...