分布式锁—3.Redisson的公平锁二
大纲
1.Redisson公平锁RedissonFairLock概述
2.公平锁源码之加锁和排队
3.公平锁源码之可重入加锁
4.公平锁源码之新旧版本对比
5.公平锁源码之队列重排
6.公平锁源码之释放锁
7.公平锁源码之按顺序依次加锁
4.公平锁源码之新旧版本对比
(1)新版本再次加锁失败不会刷新排队分数(等待超时的时间点timeout)
(2)旧版本再次加锁失败会刷新排队分数(等待超时的时间点timeout)
当客户端线程尝试加公平锁失败处于排队状态时,会进入while循环。在while循环中,每次都会等待一段时间,再重新进行尝试加公平锁。
public class RedissonLock extends RedissonBaseLock {...//加锁@Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//线程ID,用来生成设置Hash的值long threadId = Thread.currentThread().getId();//尝试加锁,此时执行RedissonLock.lock()方法默认传入的leaseTime=-1Long ttl = tryAcquire(-1, leaseTime, unit, threadId);//ttl为null说明加锁成功if (ttl == null) {return;}//加锁失败时的处理CompletableFuture<RedissonLockEntry> future = subscribe(threadId);if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {commandExecutor.syncSubscription(future);}try {while (true) {//再次尝试获取锁ttl = tryAcquire(-1, leaseTime, unit, threadId);//返回的ttl为null,获取到锁,就退出while循环if (ttl == null) {break;}//返回的ttl不为null,则说明其他客户端或线程还持有锁//那么就利用同步组件Semaphore进行阻塞等待一段ttl的时间if (ttl >= 0) {try {commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {if (interruptibly) {commandExecutor.getNow(future).getLatch().acquire();} else {commandExecutor.getNow(future).getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(commandExecutor.getNow(future), threadId);}}...
}
假设第二个客户端线程第一次加锁是在10:00:00,然后在10:00:15该客户端线程再次发起请求尝试进行加锁,但第一个客户端线程在10:00:00~10:00:15之间一直持有这把锁,此时第二个客户端线程的再次加锁流程如下:
(1)新版本再次加锁失败不会刷新排队分数(等待超时的时间点timeout)
步骤一:进入while循环,移除等待超时的线程。执行命令"lindex redisson_lock_queue:{myLock} 0",获取队列排第一元素。此时获取到UUID2:ThreadID2,代表着第二个客户端线程正在队列里排队。
继续执行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",从有序集合中获取UUID2:ThreadID2对应的分数,比如获取到的timeout = 10:05:20。根据当前时间是10:00:15,那么timeout <= 10:00:15的这个条件不成立,于是退出while循环。
步骤二:判断当前线程现在能否尝试获取锁,发现不能通过。因为执行命令"exists myLock"时,发现锁已经存在。
步骤三:判断锁是否已经被当前线程持有。由于第二个客户端线程的UUID + 线程ID必然不等于第一个客户端线程,所以此时执行命令"hexists myLock UUID2:ThreadID2",发现不存在,所以此处的可重入锁的判断条件也不成立。
步骤四:判断当前获取锁失败的线程是否已经在队列中排队。由于当前线程是第二次尝试获取锁,所以判断通过。然后返回第二个客户端线程等待获取锁时,还剩多少时间就超时,不会刷新排队分数。
//Redisson的3.16.8版本
if (command == RedisCommands.EVAL_LONG) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,//步骤一:remove stale threads,移除等待超时的线程"while true do " +//获取队列中的第一个元素//KEYS[2]是一个用来对线程排队的队列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +//获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间//KEYS[3]是一个用来对线程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除//ARGV[4]是当前时间"if timeout <= tonumber(ARGV[4]) then " +//从有序集合 + 队列中移除这个线程"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else " +"break;" +"end;" +"end;" +//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;KEYS[2]是对线程排队的队列;//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID + ThreadID;"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步骤三:当前线程执行获取锁的操作//弹出队列的第一个元素 + 从有序集合中删除UUID:ThreadID对应的元素"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +//递减有序集合中每个线程的分数,也就是递减每个线程获取锁时的已经等待时间//zrange返回有序集合KEYS[3]中指定区间内(0,-1)的成员,也就是全部成员"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +//对有序集合KEYS[3]的成员keys[i]的score减去:tonumber(ARGV[3])//ARGV[3]就是线程获取锁时可以等待的时间,默认是5分钟"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +//hset设置Hash值进行加锁操作 + pexpire设置锁key的过期时间 + 最后返回nil表示加锁成功"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +//步骤四:判断锁是否已经被当前线程持有(可重入锁),KEYS[1]是锁的名字,ARGV[2]是当前线程的UUID+ThreadID;"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2],1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +//步骤五:判断当前获取锁失败的线程是否已经在队列中排队//KEYS[3]是对线程排序的有序集合,ARGV[2]是当前线程的UUID + ThreadID;"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +"if timeout ~= false then " +//如果当前获取锁失败的线程已经在队列中排队//那么就返回该线程等待获取锁时,还剩多少时间就超时了,外部代码拿到这个时间会阻塞等待这个时间//ARGV[3]是当前线程获取锁时可以等待的时间,ARGV[4]是当前时间"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +"end;" +//步骤六:对获取锁失败的线程进行排队处理"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +"local ttl;" +//如果在队列中排队的最后一个元素不是当前线程"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +//lastThreadId是在队列中排最后的线程,ARGV[2]是当前线程的UUID + 线程ID,ARGV[4]是当前时间//因为拥有最大过期时间的线程在队列中是排最后的//所以可通过队列中的最后一个元素的过期时间,计算当前线程的过期时间//从而保证新加入队列和有序集合的线程的过期时间是最大的//下面这一行会计算出:还有多少时间,当前队列中排最后的线程就会过期,外部代码拿到这个时间会阻塞等待这个时间 //这样后一个加入队列的线程,会阻塞等待前一个加入队列的线程的过期时间"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +"else " +//下面这一行会计算出:还有多少时间,锁就会过期,外部代码拿到这个时间会阻塞等待这个时间"ttl = redis.call('pttl', KEYS[1]);" +"end;" +//计算当前线程在排队等待锁时的过期时间"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +//把当前线程作为一个元素插入有序集合,并设置元素分数为该线程在排队等待锁时的过期时间//然后再把当前线程作为一个元素插入队列尾部"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end;" +"return ttl;",Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),unit.toMillis(leaseTime),getLockName(threadId),wait,//默认是5分钟currentTime);
}
(2)旧版本再次加锁失败会刷新排队分数(等待超时的时间点timeout)
旧版本公平锁的lua脚本如下所示,当第二个客户端线程再次加锁时会再次进入排队逻辑。
首先会出计算队列中的第一个元素还有多少时间就超时,即ttl。然后根据ttl + 传入的等待时间,计算当前线程等待锁的超时时间timeout。
接着执行命令"zadd redisson_lock_timeout:{myLock} timeout UUID2:ThreadID2",刷新有序集合中的同名元素的分数为timeout。客户端线程每次重复尝试加锁,都会将其对应的过期时间往后延长,也就是刷新了排队的分数。
zadd命令在添加存在的元素时,会返回0,但会更新该元素的分数。
//Redisson的3.8.1版本
if (command == RedisCommands.EVAL_LONG) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,//步骤一:移除等待超时的线程"while true do " +//获取队列中的第一个元素//KEYS[2]是一个用来对线程排队的队列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end; " +//获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间//KEYS[3]是一个用来对线程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除//ARGV[4]是当前时间"if timeout <= tonumber(ARGV[4]) then " +//从有序集合 + 队列中移除这个线程"redis.call('zrem', KEYS[3], firstThreadId2); " +"redis.call('lpop', KEYS[2]); " +"else " +"break;" +"end; " +"end;" +//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;KEYS[2]是对线程排队的队列;//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID+ThreadID;"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步骤三:当前线程执行获取锁的操作//弹出队列的第一个元素 + 从有序集合中删除UUID:ThreadID对应的元素"redis.call('lpop', KEYS[2]); " +"redis.call('zrem', KEYS[3], ARGV[2]); " +//hset设置Hash值进行加锁操作 + pexpire设置锁key的过期时间 + 最后返回nil表示加锁成功"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//步骤四:判断锁是否已经被当前线程持有,KEYS[1]是锁的名字,ARGV[2]是当前线程的UUID+ThreadID;"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//步骤五:对获取锁失败的线程进行排队处理"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +"local ttl; " +//如果在队列中排队的第一个元素不是当前线程"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +//计算队列中第一个元素还有多少时间就超时了"ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +"else " +"ttl = redis.call('pttl', KEYS[1]);" +"end; " +//计算当前线程等待锁的超时时间"local timeout = ttl + tonumber(ARGV[3]);" +//把当前线程作为一个元素插入有序集合,并设置元素分数为该线程在排队等待锁时的过期时间//然后再把当前线程作为一个元素插入队列尾部"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end; " +"return ttl;",Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),//KEYS[1]、KEYS[2]、KEYS[3]internalLockLeaseTime,//ARGV[1]getLockName(threadId),//ARGV[2]currentTime + threadWaitTime,//ARGV[3] = 当前时间 + 5秒currentTime//ARGV[4]);
}
注意:如果仅仅使用有序集合是不行的,因为有序集合的分数在lua脚本执行过程中也会发生变化。旧版本中,客户端线程每次尝试加锁,有序集合中的分数会更新。新版本中,当前线程可以尝试获取锁时,也会遍历更新有序集合中的分数。
此外,有序集合获取第一个元素的时间复杂度比队列要高。如果仅仅使用队列也是不行的,因为需要管理排队线程的等待超时时间。如果没有有序集合,那么就不能移除在队列中排队已超时的线程。当然,为了管理线程的等待超时时间,将有序集合换成两层Hash值也可以。
5.公平锁源码之队列重排
(1)新版本在5分钟后尝试再次加锁才会队列重排
(2)旧版本在5秒后尝试再次加锁就会队列重排
(3)导致队列重排的是lua脚本的步骤一(移除等待超时的线程)
(1)新版本在5分钟后尝试再次加锁才会队列重排
新版本的公平锁中,获取锁失败的线程默认会进入队列最多等待5分钟。
在这5分钟内,该线程不管再次加锁多少次,都不会刷新队列排序和分数。
在这5分钟内,该线程没有进行再次加锁尝试,就会被移出队列和有序集合。所以5分钟后,该线程才尝试再次加锁,那么会重新入队,导致队列重排。
(2)旧版本在5秒后尝试再次加锁就会队列重排
旧版本的公平锁中,获取锁失败的线程默认会进入队列最多等待5秒钟。
在这5秒钟内,该线程只要重新尝试进行加锁,那么就会延长其最多等待时间,也就是刷新有序集合中的排队分数。
在这5秒钟内,该线程没有进行再次加锁尝试,就会被移出队列和有序集合。所以5秒钟后,该线程才尝试再次加锁,那么会重新入队,导致队列重排。
(3)导致队列重排的是lua脚本的步骤一(移除等待超时的线程)
也就是公平锁lua脚本中while循环的作用。
当客户端线程使用RedissonLock的tryAcquire()方法尝试获取公平锁,并且指定了一个获取锁的超时时间时。比如指定客户端线程在队列里排队超过了20秒,就不再尝试获取锁了。如果获取锁的超时时间没有指定,新版本是默认5分钟超时,旧版本是默认5秒后超时。
此时由于这些等待获取锁已超时的线程元素还存在队列和有序集合里,所以可以通过while循环的逻辑来清除这些不再尝试获取锁的客户端线程。
在新版本,随着时间推移,这些等待获取锁超时的线程就会被移出队列。在旧版本,随着时间推移,这些等待获取锁超时的线程只要不再尝试加锁,那么其等待获取锁的超时时间就不会更新被不断延长,就会被移除队列。
如果客户端宕机了,那么客户端就不会重新尝试获取锁。在新版本中,随着时间推移,宕机的客户端线程就会被移出队列。在旧版本中,就不会刷新和延长有序集合中的超时时间分数,这样while循环的逻辑就会将这些宕机的客户端线程从队列中移出。
在新版本中,最多5分钟后,宕机的客户端线程会被移出队列。在旧版本中,最多5秒钟后,宕机的客户端线程就会被移出队列。
因为网络延迟等原因,可能会导致客户端线程等待锁时间过长,从而触发各个客户端线程的排队顺序的重排序。有的客户端如果在队列里等待时间过长,可能就会触发一次队列的重排序。新版本触发重排序的频率是每5分钟,旧版本触发重排序的频率是每5秒。
//步骤一:移除等待超时的线程
"while true do " +//获取队列中的第一个元素//KEYS[2]是一个用来对线程排队的队列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end; " +//获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间//KEYS[3]是一个用来对线程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除//ARGV[4]是当前时间"if timeout <= tonumber(ARGV[4]) then " +//从有序集合 + 队列中移除这个线程"redis.call('zrem', KEYS[3], firstThreadId2); " +"redis.call('lpop', KEYS[2]); " +"else " +"break;" +"end; " +
"end;" +
6.公平锁源码之释放锁
(1)释放公平锁的流程
(2)释放公平锁的lua脚本分析
(1)释放公平锁的流程
释放公平锁首先调用的还是RedissonLock的unlock()方法。
在RedissonLock的unlock()方法中,会调用get(unlockAsync())。也就是首先调用RedissonBaseLock的unlockAsync()方法,然后调用RedissonObject的get()方法。
其中个RedissonBaseLock的unlockAsync()方法是异步化执行的方法,释放锁的操作是异步执行的。而RedisObject的get()方法会通过RFuture同步等待获取异步执行的结果。所以,可以将get(unlockAsync())理解为异步转同步。
在RedissonBaseLock的unlockAsync()方法中,就会调用公平锁RedissonFairLock的unlockInnerAsync()方法进行释放锁。然后当完成释放锁的处理后,会通过异步去取消定时调度任务。
public class Application {public static void main(String[] args) throws Exception {Config config = new Config();config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");//创建RedissonClient实例RedissonClient redisson = Redisson.create(config);//获取公平的可重入锁RLock fairLock = redisson.getFairLock("myLock");fairLock.lock();fairLock.unlock();...}
}public class RedissonLock extends RedissonBaseLock {...@Overridepublic void unlock() {...//异步转同步//首先调用的是RedissonBaseLock的unlockAsync()方法//然后调用的是RedissonObject的get()方法get(unlockAsync(Thread.currentThread().getId()));...}...
}public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {...@Overridepublic RFuture<Void> unlockAsync(long threadId) {//异步执行释放锁的lua脚本RFuture<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {//取消定时调度任务cancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}protected abstract RFuture<Boolean> unlockInnerAsync(long threadId);...
}public class RedissonFairLock extends RedissonLock implements RLock {private final long threadWaitTime;private final CommandAsyncExecutor commandExecutor;private final String threadsQueueName;private final String timeoutSetName;public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {this(commandExecutor, name, 60000*5);}public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.threadWaitTime = threadWaitTime;threadsQueueName = prefixName("redisson_lock_queue", name);timeoutSetName = prefixName("redisson_lock_timeout", name);}@Overrideprotected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//步骤一:移除等待超时的线程"while true do " +//获取队列中的第一个元素//KEYS[2]是一个用来对线程排队的队列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end; " +//获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间//KEYS[3]是一个用来对线程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除//ARGV[4]是当前时间"if timeout <= tonumber(ARGV[4]) then " +//从有序集合 + 队列中移除这个线程"redis.call('zrem', KEYS[3], firstThreadId2); " +"redis.call('lpop', KEYS[2]); " +"else " +"break;" +"end; " +"end;" +//步骤二:判断锁是否还存在,判断key为锁名的Hash值是否存在"if (redis.call('exists', KEYS[1]) == 0) then " +//获取队列中排第一的线程"local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " +//ARGV[1]为通知事件的类型"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +"end; " +"return 1; " +"end;" +//步骤二:判断锁是否还存在,判断key为UUID+线程ID的Hash值是否存在"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//对key为UUID+线程ID的Hash值还存递减1"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"end; " +"redis.call('del', KEYS[1]); " +"local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " +//发布一个事件给在队列中排第一的线程"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +"end; " +"return 1; ",Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),LockPubSub.UNLOCK_MESSAGE,//ARGV[1]internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());}...
}
(2)释放公平锁的lua脚本分析
步骤一:移除等待超时的线程
首先也会进入while循环,移除等待超时的线程。即获取队列中排第一的线程,判断该线程的过期时间是否已小于当前时间。如果小于当前时间,那么就说明该线程在队列中的排队已经过期,于是便将该线程从有序集合 + 队列中移除。后续如果该线程再次尝试加锁,那么会重新排序 + 重新入队。
步骤二:判断锁是否还存在
如果key为锁名的Hash值已不存在,那么先获取队列中排第一的线程,然后发布一个事件给该线程对应的客户端让其获取锁。
如果key为锁名的Hash值还存在,那么判断field为UUID + 线程ID的映射是否存在。如果field为UUID + 线程ID的映射不存在,那么表示锁已经被释放了,直接返回nil。如果field为UUID + 线程ID的映射存在,那么在key为锁名的Hash值中,对field为UUID + 线程ID的value值递减1。也就是调用Redis的hincrby命令,进行递减1处理。
步骤三:对递减1后的结果进行如下判断处理
如果递减1后的结果大于0,表示线程还在持有锁。对应于持有锁的线程多次重入锁,此时需要重置锁的过期时间。
如果递减1后的结果小于0,表示线程不再持有锁,则删除锁对应的key,并且发布一个事件给在队列中排第一的线程所对应的客户端。
7.公平锁源码之按顺序依次加锁
(1)锁被释放后,排第二的客户端线程先来加锁
(2)锁被释放后,排第一的客户端线程再来加锁
假设客户端A先持有锁,而客户端B在队列里面是排在客户端C的后面。那么如果客户端A释放了锁后,客户端B和C是如何按顺序加锁的。
(1)锁被释放后,排第二的客户端线程先来加锁
锁被客户端A释放掉,锁key被删除之后,客户端B先来进行尝试加锁。此时客户端B执行的lua脚本步骤二的逻辑:
//check if the lock can be acquired now
//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁
//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;
//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +...
"end;"
首先,执行判断"exists myLock = 0",由于当前锁存在,所以条件不成立。
然后,执行判断"exists redisson_lock_queue:{myLock} = 0",由于队列存在,所以条件不成立。
接着,执行判断"lindex redisson_lock_queue:{myLock} 0 == UUID2:ThreadID2",由于队列存在,但是在队列中排第一的不是客户端B而是客户端C,所以条件不成立,客户端B无法加锁。
由此可见:即使锁释放掉后,多个客户端来尝试加锁也只认队列中排第一的客户端。从而实现按队列的顺序依次获取锁,保证了公平性。
(2)锁被释放后,排第一的客户端线程再来加锁
当在队列中排第一的客户端C此时过来尝试加锁时,就会执行如下步骤三的尝试加锁逻辑:
//check if the lock can be acquired now
//步骤二:判断当前线程现在能否尝试获取锁,以下两种情况可以通过判断去进行尝试获取锁
//情况一:锁不存在 + 队列也不存在;KEYS[1]是锁的名字;KEYS[2]是对线程排队的队列;
//情况二:锁不存在 + 队列存在 + 队列的第一个元素就是当前线程;ARGV[2]是当前线程的UUID+ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步骤三:当前线程执行获取锁的操作//remove this thread from the queue and timeout set//弹出队列的第一个元素 + 从有序集合中删除UUID:ThreadID对应的元素"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +//decrease timeouts for all waiting in the queue//递减有序集合中每个线程的分数,也就是递减每个线程获取锁时的已经等待时间//zrange返回有序集合KEYS[3]中指定区间内(0,-1)的成员,也就是全部成员"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +//对有序集合KEYS[3]的成员keys[i]的score减去:tonumber(ARGV[3])//ARGV[3]就是线程获取锁时可以等待的时间,默认是5分钟"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +//acquire the lock and set the TTL for the lease//hset设置Hash值进行加锁操作 + pexpire设置锁key的过期时间 + 最后返回nil表示加锁成功"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +
"end;"
首先,执行命令"lpop redisson_lock_queue:{myLock}",将队列中的第一个元素弹出来。
然后,执行命令"zrem redisson_lock_timeout:{myLock} UUID3:ThreadID3",将有序集合中客户端C的线程对应的元素给删除掉。
接着,执行"hset myLock UUID3:ThreadID3 1"进行加锁,设置field为UUID + 线程ID的value值为1。
最后,执行命令"pexpire myLock 30000",设置key为锁名的Hash值的过期时间为30000毫秒。
客户端C完成加锁后,客户端C就会从队列中出队,此时排在队头的就是客户端B。
相关文章:
分布式锁—3.Redisson的公平锁二
大纲 1.Redisson公平锁RedissonFairLock概述 2.公平锁源码之加锁和排队 3.公平锁源码之可重入加锁 4.公平锁源码之新旧版本对比 5.公平锁源码之队列重排 6.公平锁源码之释放锁 7.公平锁源码之按顺序依次加锁 4.公平锁源码之新旧版本对比 (1)新版本再次加锁失败不会刷新…...
C# 类库打包dll文件
目录 前言操作流程注意事项 前言 在C#中,有多种方式可以对代码进行加密,以保护源代码不被轻易查看或修改,这篇文章主要介绍将C# cs类文件加密为dll文件的方式进行保护。 操作流程 在 Visual Studio 中,选择“创建新项目”。 选…...
DELL EMC Unity存储如何让控制器进入service mode和退出service mode
近期遇到好几个关于DELL EMC unity (VNXe)存储系统挂掉的案例,都是很后期才寻找支持到我们这里,然后再看问题,已经变得很复杂,几乎都是从一个相对简单的问题搞成了一锅粥甚至最后丢数据的情况。 为此&…...
【微知】如何通过mlxlink查看Mellanox网卡和光模块相关的信息?( mlxlink -d 01:00.0 -m)
背景 通过mlxlink可以查看Mellanox网卡的一些链路信息和硬件信息,也可以查看所插入的光模块的一些信息。 兄弟篇通过ethtool查看的方法:如何查看Mellanox网卡上的光模块的信息? 命令 mlxlink -d 01:00.0 -mman手册介绍: 如果…...
FPGA开发,使用Deepseek V3还是R1(1):应用场景
以下都是Deepseek生成的答案 FPGA开发,使用Deepseek V3还是R1(1):应用场景 FPGA开发,使用Deepseek V3还是R1(2):V3和R1的区别 FPGA开发,使用Deepseek V3还是R1&#x…...
Linux系列:如何用 C#调用 C方法造成内存泄露
一个简单的非托管内存泄露 1. 构建 so 文件 在 Windows 平台上我们会通过 MSVC 编译器将 C代码编译出一个成品 .dll,在 Linux 上通常会借助 gcc 将 c 编译成 .so 文件,这个.so 全称 Shared Object,为了方便讲解,先上一段简单的代码…...
C# 数据类型相关
分类 按照数据复杂程度 按照数据存储 类型转换 隐式转换 隐式转换无法完成由精度高的数据类型向精度低的数据类型转换 显式转换 又称为强制类型转换,显示转换不一定总是成功,且转换过程中可能出现数据丢失 int num 666;float result (float)num; …...
Python Web应用开发之Flask框架——基础
一、前言 在即将开启的 Flask 学习之旅中,为了能够顺利掌握并运用 Flask 进行 Web 开发,您需要具备一定的基础知识,同时了解相应的运行环境。 需要你具备的知识:Python 编程语言、HTML、CSS、HTTP协议、数据库(如:MySQL、MongoDB) 本文所使用的环境:操作系统Windows…...
分析白屏winscope
在 Android 设备上,播放视频时锁屏后解锁出现闪白屏的问题,通常与 Surface 生命周期、视频渲染或 UI 刷新机制有关。要定位和解决这个问题,可以按照以下步骤进行分析,并利用 WinScop 工具(如果适用)来辅助调…...
使用Word时无法粘贴,弹出错误提示:运行时错误‘53‘:文件未找到:MathPage.WLL
报错说明 使用Word时无法粘贴,粘贴时弹出提示如下: 一般出现这种情况时,我想你是刚装完MathType不久,博主装的是MathType7版本,出现了这个问题。 出现这个问题的原因是"mathpage.wll"这个文件在Office的插…...
玩转python: 深度解析Python高阶函数及推导式
1 高阶函数:工程化编程的基石 1.1 高阶函数基础概念 高阶函数(Higher-Order Function)是函数式编程范式的核心要素,指能够接受函数作为参数或返回函数作为结果的函数。在Python中,这类函数构成了数据处理的基础架构&…...
DeepSeek vs Grok vs ChatGPT:大模型三强争霸,谁将引领AI未来?
DeepSeek vs. Grok vs. ChatGPT:大模型三强争霸,谁将引领AI未来? 在人工智能领域,生成式模型的竞争已进入白热化阶段。DeepSeek、Grok和ChatGPT作为三大代表性工具,凭借独特的技术路径和应用优势,正在重塑…...
VSCode详细安装步骤,适用于 Windows/macOS/Linux 系统
以下是 Visual Studio Code (VSCode) 的详细安装步骤,适用于 Windows/macOS/Linux 系统: VSCode 的详细安装步骤 一、Windows 系统安装1. 下载安装包2. 运行安装程序3. 验证安装 二、macOS 系统安装1. 方法一:官网下载安装包2. 方法二&#x…...
Linux第五讲----gcc与g++,makefile/make
1.代码编译 1.1预处理 我们通过vim编辑完文件之后,想看一下运行结果这时我们便可以试用gcc编译C语言,g编译c. 编译代码: 上述两种方法均可,code.c是我的c语言文件,mycode是我给编译后产生的二进制文件起的名&#x…...
ubuntu22.04下Meshlab打开obj文件闪退——使用Appimage并放入收藏夹中
文章目录 ubuntu22.04下Meshlab打开obj文件闪退,查了下是meshlab的apt没做好。 官网下载:https://www.meshlab.net/#download 赋予权限 sudo chmod a+x MeshLab2023.12-linux.AppImage 双击运行即可 打开权限——下面操作是放在桌面上的 创建桌面快捷方式 # 在 ~/desktop (…...
MAVEN的环境配置
在下载好maven后或解压maven安装包后进行环境配置 1.在用户环境变量中 新建一个MAVEN_HOME 地址为MAVEN目录 注:地址为解压后maven文件的根目录!!! 2.在系统环境变量的path中添加该变量 %MAVEN_HOME%\bin 3. 测试maven安装是否成…...
强化学习无痛上手笔记第1课
文章目录 Markov Decision ProcessDefinitionRelated Concepts Policy for MDP AgentDefinitionJudgement for PolicyValue FunctionsTD formula for value functionsRelation of V and QPolicy CriterionPolicy Improvement TheoremOptimal PolicyReinforcement Learning Fund…...
智能设备上的 AI 移植与部署:新趋势与实践案例
1. 引言:智能设备如何运行 AI? 随着人工智能(AI)技术的快速发展,AI 计算已经从云端走向边缘,嵌入到智能设备中,如智能手机、智能摄像头、机器人、自动驾驶汽车等。这种本地化 AI 计算能够减少延…...
【USRP】NVIDIA Sionna:用于 6G 物理层研究的开源库
目录 Sionna:用于 6G 物理层研究的开源库主要特点实现6G研究的民主化支持 5G、6G 等模块化、可扩展、可伸缩快速启动您的研究 好处原生人工智能支持综合研究平台开放生态系统 安装笔记使用 pip 安装基于Docker的安装从源代码安装“你好世界!”探索锡奥纳…...
LLM大型语言模型(一)
1. 什么是 LLM? LLM(大型语言模型)是一种神经网络,专门用于理解、生成并对人类文本作出响应。这些模型是深度神经网络,通常训练于海量文本数据上,有时甚至覆盖了整个互联网的公开文本。 LLM 中的 “大” …...
BUU44 [BJDCTF2020]ZJCTF,不过如此1 [php://filter][正则表达式get输入数据][捕获组反向引用][php中单双引号]
题目: 我仿佛见到了一位故人。。。也难怪,题目就是ZJCTF 按要求提交/?textdata://,I have a dream&filenext.php后: ......不太行,好像得用filephp://filter/convert.base64-encode/resourcenext.php 耶?那 f…...
软考中级-数据库-3.3 数据结构-树
定义:树是n(n>=0)个结点的有限集合。当n=0时称为空树。在任一非空树中,有且仅有一个称为根的结点:其余结点可分为m(m>=0)个互不相交的有限集T1,T2,T3...,Tm…,其中每个集合又都是一棵树,并且称为根结点的子树。 树的相关概念 1、双亲、孩子和兄弟: 2、结点的度:一个结…...
磁盘空间不足|如何安全清理以释放磁盘空间(开源+节流)
背景: 最近往数据库里存的东西有点多,磁盘不够用 查看磁盘使用情况 df -h /dev/sda5(根目录 /) 已使用 92% 咱们来开源节流 目录 背景: 一、开源 二、节流 1.查找 大于 500MB 的文件: 1. Snap 缓存…...
SpringCloud系列教程(十二):网关配置动态路由
除了token以外,还有一个很实用的功能就是把网关的路由配置放到nacos上,并且修改路由配置的时候,网关服务可以动态的更新,这样我们在调整网络配置的时候,就不用重启服务了。所以我们需要用到两个重要的类:Na…...
Java-实现PDF合同模板填写内容并导出PDF文件
可用于公司用户合同导出pdf文件 效果图 一、导入所需要jar包 <!--生成PDF--><dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId><version>5.5.11</version></dependency><dependency&…...
基于STM32的环境监测系统(自制蓝牙APP)
目录 项目概述 实物图 演示视频 概述 硬件模块 原理图以及PCB 0.96寸OLED屏幕(SSD1306) CubeMX配置 初始化代码 MQ-2烟雾传感器 CubeMX配置 初始化代码 DHT11温湿度模块 驱动代码 HC-05蓝牙模块 CubeMX配置 编辑 空闲中断回调函数 有…...
C++ Windows下屏幕截图
屏幕截图核心代码(如果要求高帧率,请使用DxGI): // RGB到YUV的转换公式 #define RGB_TO_Y(r, g, b) ((int)((0.299 * (r)) (0.587 * (g)) (0.114 * (b)))) #define RGB_TO_U(r, g, b) ((int)((-0.169 * (r)) - (0.331 * (g)) …...
月结保障:回滚慢、行锁频发
问题背景 3.1号月结现场保障,到场了先让kill了一个账务的会话,回滚了20min,巡检的时候发现报表库有几条行锁:enq: TX - row lock contention,sql:delete from table_name 语句已经失败,正在回滚…...
Golang的微服务服务发现机制
## 1. Golang微服务服务发现机制 微服务架构已经成为当今软件开发的主流趋势,它能将复杂的单体应用拆分成小而独立的服务单元,实现更快的开发、部署和扩展。在微服务架构中,服务发现是非常重要的一环,它能够实现服务之间的自动发现…...
Keepalived 入门详解:高可用集群部署最佳实践!
1. 什么是 Keepalived? 在分布式集群中,单点故障(SPOF) 是影响系统稳定性的重要问题。Keepalived 作为一款高可用服务软件,可以有效防止集群单点故障,保障系统的高可用性。 Keepalived 最初是为 LVS&#…...
