Java的AQS框架是如何支撑起整个并发库的
如何设计一个抽象队列同步器
- 引言
- AQS需要解决哪些场景下的问题
- 互斥模式
- 获取锁
- 抢锁失败入队
- 释放锁
- 小总结
- 共享模式
- 获取共享资源
- 释放共享资源
- 唤醒丢失问题
- 小总结
- 混合模式
- 获取写锁
- 释放写锁
- 获取读锁
- 读锁是否应该阻塞
- 释放读锁
- 小总结
- 栅栏模式
- 等待
- 递减计数
- 条件变量模式
- 等待条件成立
- 条件满足,唤醒等待的节点
- 小总结
- 小结
引言
AQS 抽象队列同步器(AbstractQueuedSynchronizer) 作为Java并发库的基石,像ReentrantLock,ThreadPoolExecutor,Semaphore等类都使用到AQS完成线程间同步,本文主要来和大家分享一下我自己对AQS实现和设计理念的一些新的思考。
AQS(抽象队列同步器)核心由两部分组成:
- 条件变量
- 等待队列
条件变量很简单,所有场景下的条件变量都可以泛化为一个整数值,因此AQS使用一个整数来表示条件变量:
private volatile int state;
考虑到等待线程数量未知,并且经常发生出入队动作,因此AQS队列采用链表结构实现,那么下面我们首先来看看AQS针对的哪些场景下的问题。
AQS需要解决哪些场景下的问题
AQS需要解决哪些场景下的问题 , 泛化而言有五个方面:
-
互斥模式
- 互斥模式下 ,state == 1 表示资源被占有 ,state == 0 表示资源空闲 , state > 1 表示可重入情况
- 互斥模式下 ,state == 1 表示资源被占有 ,state == 0 表示资源空闲 , state > 1 表示可重入情况
-
共享模式: state > 0 表示资源有剩余 , state == 0 表示资源不足
-
混合互斥和共享的模式: 当需要对资源进行修改时,需要转到互斥模式,仅仅是对资源进行读取操作,可以处在共享模式下,如: 读写锁场景
-
栅栏模式: state为剩余未到达栅栏处的线程数,先到达栅栏处的线程需要等待直到剩余线程都达到栅栏 , 当state=0时,说明所有线程都到达栅栏处,此时打开栅栏,即唤醒所有线程继续执行
-
条件变量: AQS支持多条件变量,条件变量需要锁的保护,所以当AQS使用条件变量时,要求处于互斥模式下,此时互斥模式充当互斥锁对条件变量进行保护,防止虚假唤醒问题发生
如果大家仔细观察会发现以上四种模式本质都是条件变量的应用,不同之处在于对何时满足条件这个定义不同,因此AQS也支持第五种模式条件变量,当以上四种常见均不能满足你的需求时,由你自己设计满足条件的定义是什么。
关于条件变量的实现,这里我想特别展开说明一点: 为什么条件变量需要锁的保护 ?大家可以反向思考一下,如果没有锁的保护,条件变量的使用会出现什么问题 ?
如果不使用锁来保护你正在等待的数据,就会出现虚假唤醒的问题,这个问题出现的本质是因为第1步和第4步之间存在一个时间窗口,在这个时间窗口内,如果线程2执行notify操作,那么将使得线程1错过唤醒机会,从而出现Lost Wakeup的问题。
解决Lost Wakeup问题的方法就是使用锁来保护等待的数据,也就是条件变量,如下图所示:
当然,如果这边继续深入下去的话,大家还可以思考一点: 条件变量和锁有什么联系 ? 两者表现行为都是在条件不满足时阻塞,条件满足时被唤醒,所以泛化来看,锁本质也是条件变量的一种应用,那么锁的使用上也存在虚假唤醒问题吗?
首先,如果按照问题所述,显然是存在虚假唤醒问题的,如上图所述,但是这里的错误在于问题混淆了锁的概念,严格来说只存在两种锁:
- 自旋锁
- 睡眠锁
对于睡眠锁而言,其本质就是条件变量的一种应用,所以存在虚假唤醒问题,需要解决。而自旋锁这是通过不断CAS+重试来实现锁,对于自旋锁而言是不存在虚假唤醒问题的:
所以为了解决睡眠锁可能存在的虚假唤醒问题,我们需要使用自旋锁来保证睡眠锁的条件变量example的安全性,添加了自旋锁保护的睡眠锁实现如下:
相信经过了上面的讲解,大家已经理解了为什么条件变量需要锁的保护了,我们常说的锁其实属于睡眠锁,睡眠锁本质也是对条件变量的一种实现,那么当某个线程获取锁失败后,需要进入锁队列中挂起等待,如下图所示:
如果我们使用锁来保护条件变量,此时成功获取了锁,但是发现条件不满足的线程又会进入条件变量关联的队列中阻塞等待,如下图所示:
条件变量需要关联一个条件队列,睡眠锁本质也是条件变量的应用,因此上面说的锁队列本质也是条件队列,而锁保护的条件变量关联的队列我们也称之为条件队列,因此为了区分,本文后面都将称前者为锁队列。
还有一点很重要,就是条件变量只能在互斥锁保护下使用,共享锁模式下不能使用条件变量,否则还是会存在虚假唤醒问题:
上面已经铺垫了很多前置知识,对于AQS来说,其支持互斥锁,共享锁和多条件变量三大特性,因此其内部会存在两种类型队列,一种就是锁队列,还有一种就是条件变量队列,如果存在多个条件变量,那么就会存在多个条件队列,具体如下图所示:
下面我们从源码角度来分析一下三种模式的不同实现。
本文代码均基于JDK 11进行讲解。
互斥模式
当AQS运行在互斥模式下时,Node节点结构如下所示:
static final class Node {static final int CANCELLED = 1;static final int SIGNAL = -1;volatile Node prev;volatile Node next;volatile int waitStatus;// 省去互斥模式下使用不到的相关属性...}
其中waitStatus属性记录当前节点处于何种状态下,互斥模式下节点只可能存在三种状态:
- SIGNAL : 后继节点(successor)已经被(或将会被)阻塞(通过 park),因此当前节点必须在释放或取消时唤醒它的后继节点。
- CANCELLED: 该节点因超时或中断被取消。节点永远不会离开此状态。特别地,带有取消节点的线程不会再次阻塞。
- 0 : 节点状态值为0存在两个可能性
- 锁队列尾部节点的状态值为0,因为其没有后继节点需要唤醒 ;
- 释放锁时唤醒锁队列中第一个有效节点,此时会先将头结点的状态值设置为0,然后唤醒锁队列中第一个有效节点,被唤醒的节点尝试去获取锁,如果获取失败,会将头结点状态重新改回SIGNAL,然后再次尝试获取锁,如果获取锁成功,则将自己设置为头结点,此时头结点状态依旧为SIGNAL。
互斥模式下释放锁时会通过头结点的状态值判断当前锁队列是否还存在阻塞的线程,如果头结点状态值为0,表明当前头结点没有后继节点需要唤醒了,如果头结点值为1,表明头结点存在后继节点需要唤醒,因此我们需要对头结点的状态值变更尤其关注。
互斥模式的实现,这里以ReentrantLock为例进行说明。
获取锁
public final void acquire(int arg) {// tryAcquire: 不同模式下对获取锁成功的定义是不同的,对于互斥锁实现而言,state作为一个二元值,0表示锁空闲,1表示锁占用if (!tryAcquire(arg) &&// 如果获取锁失败,则进入锁队列进行排队等候,返回值代表等待过程中是否被打断过acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
不同模式下对获取锁成功的定义是不同的,因此AQS把tryAcquire方法的实现交由子类决定,对于ReentrantLock实现而言,state作为一个二元值,0表示锁空闲 , 1表示锁占用 , 大于1表示发生可重入:
下面以ReentrantLock公平锁为例进行讲解,公平体现在线程获取锁时,是否先看一眼锁队列是否已经有人在排队中,如果是,则自己排到锁队列末尾等候。
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 公平体现在每次抢锁前,先看一眼是否有人在等着了if (!hasQueuedPredecessors() &&// 如果锁队列为空,那么尝试去获取锁compareAndSetState(0, acquires)) {// 获取锁成功,设置当前锁的占有者为当前线程自己setExclusiveOwnerThread(current);return true;}}// 判断是否是锁重入else if (current == getExclusiveOwnerThread()) {// 累加锁重入计数int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
由于ReentrantLock支持锁重入,所以实际的state并不只有0和1两种状态。
抢锁失败入队
如果获取锁失败,当前线程进入锁队列排队等候,首先第一步新为当前线程新创建一个Node并添加到锁队列末尾:
private Node addWaiter(Node mode) {Node node = new Node(mode);// 为当前线程创建一个Node,此处Node为独占模式 for (;;) {// 锁队列是全局共享资源,因此这里采用CAS+重试确保尾插操作的原子性 Node oldTail = tail;if (oldTail != null) {node.setPrevRelaxed(oldTail);if (compareAndSetTail(oldTail, node)) {oldTail.next = node;return node;}} else {// 初始化锁队列initializeSyncQueue();}}}
第二步是把当前线程给挂起:,acquireQueued函数不仅包含把当前线程挂起的逻辑,还包含被唤醒后尝试抢锁的逻辑,如果失败继续挂起
final boolean acquireQueued(final Node node, int arg) {boolean interrupted = false;try {for (;;) {// 获取当前线程前置节点final Node p = node.predecessor();// 前置节点是头结点才有资格去抢锁if (p == head && tryAcquire(arg)) {// 抢锁成功,设置当前节点为头结点// 这里的头结点可以看做是Dummy NodesetHead(node);p.next = null; // help GC// 阻塞在锁队列等待锁的过程中是否被打断过return interrupted;}// 根据前驱节点状态,判断是否需要阻塞等待,还是再尝试获取一次锁if (shouldParkAfterFailedAcquire(p, node))// 挂起当前线程interrupted |= parkAndCheckInterrupt();}} catch (Throwable t) {// 挂起线程和唤醒后抢锁的逻辑过程中如果发生异常,则将当前Node状态设置为CANCELcancelAcquire(node);// 如果以上过程中线程被打断过,那么这里补充一次自我打断if (interrupted)selfInterrupt();throw t;}}
shouldParkAfterFailedAcquire函数根据节点的不同状态,判断当前线程自身是应该立即挂起,还是再尝试获取一次锁:
- 前置节点如果处于SIGNAL状态,则表明锁依旧被前驱节点所在线程持有,当前线程应该立即阻塞
- 前置节点处于取消状态,此时做一波清理工作,将所有处于取消态的节点都移出队列,然后当前线程再尝试偶去一次锁
- 前置节点状态值为0,那么前置节点可能为旧的尾节点或者头结点,如果是旧的尾节点,则将其状态更正为SIGNAL,因为当前节点变成新的尾节点了。如果前置节点是头结点,说明当前线程被唤醒后抢锁失败,则再次将头结点状态更正为SIGNAL。最后再尝试获取一次锁。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;// 当前节点的前置节点依然处于持有锁的状态,当前线程应该立即阻塞if (ws == Node.SIGNAL)return true; // 当前线程应该阻塞// 前置节点处于取消状态,此处从前置节点开始往前扫描,将所有处于取消状态的节点都移出锁队列 if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 此处waitStatues的值可能为0或者PROPAGATE(共享模式下节点才会存在该状态,此处不关心)pred.compareAndSetWaitStatus(ws, Node.SIGNAL);}return false;}
AQS队列采用双向链表实现,其中prev指针就用在对Cancel节点的回收中,其他场景不会用到。
parkAndCheckInterrupt方法将当前线程挂起,被唤醒后返回当前线程是否被打断过:
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
interrupted方法会清空打断标记,因此如果必要可以在被唤醒并成功抢到锁后补充一次自我打断的逻辑。
释放锁
release方法用于释放锁:
public final boolean release(int arg) {// 由子类实现决定本次锁释放是否成功if (tryRelease(arg)) {Node h = head;// 锁队列刚初始化的时候,头结点的waitStatus=0// 但是当第一个Node入队后,头结点的waitStatus应该等于SIGNAL,表明当前头结点有后继节点需要唤醒if (h != null && h.waitStatus != 0)// 说明锁队列不为空,此时唤醒后继结点unparkSuccessor(h);return true;}return false;}
tryRelease方法尝试去释放锁,不同模式下对锁被成功释放的定义不同,因此这个方法需要子类继承实现,此处给出ReentrantLock的实现:
protected final boolean tryRelease(int releases) {// 由于存在可重入清空,因此只有state递减为0时,才算锁被释放成功int c = getState() - releases;// 非法情况,只有持有锁的线程才能释放当前锁if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// state递减为0的时候,才算锁被释放成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
unparkSuccessor方法唤醒某个节点的后继结点,这里是唤醒头结点的后继结点:
private void unparkSuccessor(Node node) {int ws = node.waitStatus;// 将头结点状态值设置为0if (ws < 0)node.compareAndSetWaitStatus(ws, 0);// 获取后继节点Node s = node.next;// 如果当前节点没有后继节点或者后继节点处于取消状态,则向后寻找到第一个没有取消的节点// 然后唤醒该节点if (s == null || s.waitStatus > 0) {s = null;for (Node p = tail; p != node && p != null; p = p.prev)if (p.waitStatus <= 0)s = p;}// 唤醒节点if (s != null)LockSupport.unpark(s.thread);}
如果被唤醒的线程抢锁成功,则会在acquireQueued方法的for循环中将自己设置为新的头结点,此时新的头结点的状态值可能为SIGNAL或者0, 如果是SIGNAL说明当前节点还存在后继节点需要唤醒,否则说明当前节点是尾结点。
如果被唤醒的线程抢锁失败,则会在shouldParkAfterFailedAcquire方法中再次将头结点的状态设置为SIGNAL,因为当前头结点抢锁失败了。
小总结
AQS互斥模式的实现到此就讲解完毕了,整体而言还是很简单的,这里简单总结一下互斥模式下抢锁和释放锁的步骤 ,首先是抢锁步骤:
- 尝试获取锁
- 获取失败入队阻塞
- 获取锁成功则返回,返回前检查获取锁的过程中是否存在被吞没的中断,如果存在则补充一次自我中断
其次是释放锁步骤:
- 判断锁队列是否为空,为空则条件唤醒阶段
- 如果不为空则唤醒第一个有效节点
上面总结了一下AQS互斥模式的运行流程,下面我们来看看AQS互斥模式的实现过程中有哪些值得我们学习的并发设计技巧:
- AQS中有两个共享资源需要被保护,一个是state条件值,另一个是锁队列,这里重点在队列的头尾节点可能会出现并发问题。
- state变量可能存在的并发安全问题在多线程同时尝试抢锁,设置state的值为1,该场景下只需要单次CAS设置即可,如果失败,说明其他线程已经抢到锁了,那么当前线程执行入队阻塞流程。
- 对于队列头尾节点的访问而言,并发问题可能出在多线程共同尝试阻塞入队,此时需要CAS配合重试确保入队成功;
- 还有就是节点状态值的变更,该场景下只需要单次CAS即可,如果失败说明有其他线程更改了当前节点状态,那么当前线程的更新可以废弃。
共享模式
当AQS运行在共享模式下时,此时Node节点的结构如下所示:
static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED = 1;static final int SIGNAL = -1;static final int PROPAGATE = -3;volatile Node prev;volatile Node next;volatile int waitStatus;// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用// 所以这里改名方便下面理解Node shared;final boolean isShared() {return shared == SHARED;}// 省去共享模式下使用不到的相关属性...}
相比于互斥模式,这里节点增加了PROPAGATE状态,并且还增加了shared属性来表明当前阻塞节点是处在共享模式下,还是互斥模式下。
其实共享模式下用不到shared属性,该属性是应用在混合模式下,但是这里为了下面讲解源码方便理解,就提前拿出来说了。
PROPAGATE状态主要是为了解决并发情况下的唤醒丢失问题而提出的,下面会专门有一小节来讲解该状态的作用,这里暂不做解释。
共享模式的实现这里以Semaphore为例进行说明。
获取共享资源
Semaphore获取资源的方法是acquireSharedInterruptibly:
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 如果当前线程被打断了,则抛出打断异常 if (Thread.interrupted())throw new InterruptedException();// 尝试去获取共享资源 -- 如何才算获取共享资源成功,由子类实现决定if (tryAcquireShared(arg) < 0)// 资源数量不足,则进入阻塞阶段doAcquireSharedInterruptibly(arg);}
由子类实现决定剩余资源够不够分配给当前线程:
protected int tryAcquireShared(int acquires) {for (;;) {// Semaphore获取资源时也有公平和非公平两种模式,这里展示的是公平模式的实现// 所谓公平就是抢夺资源前先看看有没有人在排队if (hasQueuedPredecessors())return -1;// remaining小于0则表示正在等待资源的线程数量// 如果remaining>=0 说明资源还够,那么当前线程就可以获取到资源,无需阻塞等待了 // note: remaining是减去当前线程所需资源后的剩余资源数量 int available = getState();int remaining = available - acquires;// 如果剩余资源不足,则直接返回对应负值,表示资源不足需要等待if (remaining < 0 ||// 扣减完后,剩余资源数量大于等于0,则使用cas配合重试确保原子性扣减资源数量成功compareAndSetState(available, remaining))return remaining;}}
如果剩余资源不足,当前线程需要进入资源队列进行等待,这段逻辑存在于doAcquireSharedInterruptibly方法中,当然该方法还包含线程被唤醒后再次尝试抢锁的逻辑:
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 为当前线程创建Node节点尾插到资源队列尾部,节点模式是共享模式final Node node = addWaiter(Node.SHARED);try {for (;;) {// 判断自己是否是头结点的后继节点final Node p = node.predecessor();if (p == head) {// 如果是头结点的后继节点,则尝试再次去获取资源int r = tryAcquireShared(arg);// 与互斥模式不同的一点在于,如果当前线程获取资源成功,那么它会唤醒其后继节点继续尝试获取资源// 后继节点获取成功后,再唤醒它的后继节点,直到资源不足,则停止唤醒if (r >= 0) {// 如果剩余资源数量为0,也会进入到该方法,但是不会执行链式唤醒过程// 只是把当前节点更新为新的头节点setHeadAndPropagate(node, r);p.next = null; // help GCreturn;}}// 根据前驱节点状态判断是否应该阻塞if (shouldParkAfterFailedAcquire(p, node) &&// 如果当前线程是被中断唤醒的,那么直接抛出中断异常parkAndCheckInterrupt())throw new InterruptedException();}} catch (Throwable t) {// 将当前节点状态设置为取消状态cancelAcquire(node);throw t;}}
与互斥模式不同的一点在于,如果共享模式下线程被唤醒后成功获取到资源,当前线程会接着唤醒其后继节点,让其继续获取资源,后继节点在发现还有资源的情况下,继续唤醒其后继节点,直到资源不足,停止继续往后唤醒,整个过程实际是一个链式唤醒的过程。
下面我们进入释放共享资源小节,来看看链式唤醒的整个过程是如何发生的。
释放共享资源
releaseShared方法负责完成对资源的释放,释放完资源后唤醒等待中的线程:
public final boolean releaseShared(int arg) {// 资源怎样算释放成功这是由子类决定的 if (tryReleaseShared(arg)) {// 如果资源释放成功,则进入链式唤醒过程doReleaseShared();return true;}return false;}
tryReleaseShared方法中采用cas+重试确保资源数原子性累加成功:
protected final boolean tryReleaseShared(int releases) {// 因为可能存在多个线程同时尝试归还资源,因此这里为了确保能够原子性累加成功// 采用cas+重试机制实现for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
doReleaseShared方法开启链式唤醒过程:
private void doReleaseShared() {for (;;) {Node h = head;// 确保资源队列不为空if (h != null && h != tail) {int ws = h.waitStatus;// 确保头结点存在后继节点if (ws == Node.SIGNAL) {// 设置头结点的状态值为0,如果cas设置失败,说明可能存在多个线程同时尝试开启链式唤醒if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))continue; // loop to recheck cases// 唤醒头结点的后继节点unparkSuccessor(h);}// 存在并发唤醒的情况处理 --- 具体可以看唤醒丢失小节else if (ws == 0 &&!h.compareAndSetWaitStatus(0, Node.PROPAGATE))continue; // loop on failed CAS}// 头结点没有变动的情况下,说明没有竞争唤醒问题,则跳出循环// 否则尝试继续唤醒变动后的头结点的下一个后继节点if (h == head) // loop if head changedbreak;}}
被唤醒的线程会在doAcquireSharedInterruptibly方法的for循环中继续尝试去获取资源,如果获取成功,并且还有剩余资源,则唤醒自己的后继节点:
private void setHeadAndPropagate(Node node, int propagate) {// 更新当前成功获取资源的节点为新的头结点Node h = head;setHead(node);// propagate代表剩余资源剩余数量,只有在还有剩余资源的情况下才会去唤醒自己的后继节点// h保留的是旧的头结点,如果旧的头结点状态值为PROPAGATE,则再尝试唤醒其后继节点,去获取一下资源试试if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;// 如果当前节点是链表尾节点,或者后继节点是共享类型节点,则尝试进行唤醒if (s == null || s.isShared())doReleaseShared();}}
如果被唤醒的线程获取资源失败,则会进入shouldParkAfterFailedAcquire将头结点状态由0或者PROPAGATE更新为SIGNAL,然后自己再尝试获取一次资源:
如果不考虑并发唤醒的问题,那么整个链式唤醒过程如下图所示:
唤醒丢失问题
并发唤醒会使得整个问题复杂起来,为了解决这个问题AQS引入了PROPAGATE状态,在JDK 6之前是没有这个状态的,当时的Semphore实现存在一个bug,如下图所示:
- bug: https://bugs.openjdk.java.net/browse/JDK-6801020
- fix: https://github.com/openjdk/jdk8u/commit/b63d6d68d93ebc34f8b4091a752eba86ff575fc2
- t3 调用 tryReleaseShared 方法释放 1 个许可,然后调用 unparkSuccessor 方法将 head.waitStatus 由 SIGNAL 改为 0,并唤醒后继节点 t1 后退出
- t1 被 t3 唤醒,调用 tryAcquireShared 方法获取到许可并返回 0(此时还未调用 setHeadAndPropagate 方法中的 setHead 方法将自己设置为新 head)
- t4 调用 tryReleaseShared 方法释放 1 个许可,因为 head 未改变,因此 head.waitStatus 仍为 0,这导致 t4 退出,不会继续调用 unparkSuccessor 方法唤醒后继节点 t2
- t1 继续调用 setHeadAndPropagate 方法,首先将自己设置为新 head,然后因为 tryAcquireShared 方法返回 0 导致 t1 退出,不会继续调用 unparkSuccessor 方法唤醒后继节点 t2
至此,t2 永远不会被唤醒,问题产生。
上述bug产生的原因就是因为共享锁的获取和释放在同一时刻很可能会有多条线程并发执行,这就导致在这个过程中可能会产生这种 waitStatus 为 0 的中间状态,而AQS会将waitStatus状态值为0看做是资源队列为空的信号,因此此时产生的尴尬问题就是明明还有资源,但是确无法唤醒等待队列中的线程,因此通过引入 PROPAGATE 状态来解决这个问题。
为了解决这个问题为头结点引入了PROPAGATE状态,此时我们再来看引入PROPAGATE状态后的流程是怎样的:
- t3 调用 tryReleaseShared 方法释放 1 个许可,然后调用 doReleaseShared 方法将 head.waitStatus 由 SIGNAL 改为 0,并唤醒后继节点 t1 后退出
- t1 被 t3 唤醒,调用 tryAcquireShared 方法获取到许可并返回 0(此时还未调用 setHeadAndPropagate 方法中的 setHead 方法将自己设置为新 head)
- t4 调用 tryReleaseShared 方法释放 1 个许可,因为 head 未改变,因此 head.waitStatus 仍为 0,然后调用 doReleaseShared 方法将 head.waitStatus 由 0 改为 PROPAGATE 后 t4 退出
- t1 继续调用 setHeadAndPropagate 方法,首先将自己设置为新 head,因为此时旧 head.waitStatus 为 PROPAGATE 且同步队列中 t1 还有后继节点 t2,所以继续调用 doReleaseShared 方法,将 head.waitStatus 由 SIGNAL 改为 0,并唤醒后继节点 t2 后退出
后继节点 t2 被唤醒,问题解决。
引入 PROPAGATE 状态还有一个好处:加速唤醒后继节点。
doReleaseShared
方法中有这个条件判断:
if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;
如果没有 PROPAGATE
状态,当多条线程同时运行到这里后,可能就直接退出了,虽然这时有个线程正在调用 unparkSuccessor
方法去唤醒后继节点,但唤醒后的线程也需要等到获取到锁且成为头节点后才能调用 doReleaseShared
方法再去唤醒后继节点。
当并发大时,在这个过程中很有可能会有新节点入队并满足唤醒条件,所以有了 PROPAGATE
状态,当多条线程同时运行到这里后,CAS 失败后的线程可以再次去循环判断能否唤醒后继节点,如果满足唤醒条件就去唤醒。
毕竟,调用 doReleaseShared
方法越多、越早就越有可能更快的唤醒后继节点。
线程t6先入队阻塞,然后线程t4和t5释放资源,如果没有上面那句判断,那么就不会将头结点设置为PROPAGATE状态,也就不会去唤醒t2和t6了。
小总结
AQS共享模式的实现到此就讲解完毕了,整体而言,除了链式唤醒那块比较难搞之外,其他部分还是很容易理解的,下面简单总结一下共享模式下抢夺资源和释放资源的步骤,首先是抢夺资源的步骤:
- 尝试获取资源
- 获取失败则入队阻塞,获取成功直接返回
其次是释放锁步骤:
- 释放资源,累加资源计数
- 判断头结点是否存在后继结点,如果存在则设置当前头结点状态为0,,然后唤醒其后继结点,当前线程结束工作
- 被唤醒的线程尝试去获取资源,如果获取失败,则更正头结点状态为SIGNAL,然后自己再尝试获取一次,还是不行就把自己挂起
- 如果获取成功,设置当前Node为新的头结点,然后判断是否还有剩余资源,如果有的话就唤醒自己的后继节点
- 后继节点重复链式唤醒步骤,直到资源不足或者等待队列为空
链式唤醒过程的讲述不包含并发唤醒处理步骤,这一块比较复杂,大家可以参考唤醒丢失小节自行总结。
共享模式的实现中用到的并发技巧还是CAS+重试,全程并没有使用到睡眠锁。
混合模式
当AQS运行在混合模式下时,此时Node节点的结构如下图所示:
static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED = 1;static final int SIGNAL = -1;static final int PROPAGATE = -3;volatile Node prev;volatile Node next;volatile int waitStatus;// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用// 所以这里改名方便下面理解Node shared;final boolean isShared() {return shared == SHARED;}// 省去混合模式下使用不到的相关属性...}
混合模式的实现这里以ReentrantReadWriteLock读写锁为例进行说明。
读写锁场景下,我们主要需要关心读写互斥的问题,首先读读肯定是并发的,但是读写,写写操作必须是互斥的;还有几个问题我们也需要考虑一下:
- 线程1持有读锁,等待队列中有线程2在等待写锁,如果此时线程3来请求读锁,请问线程3时需要等待还是直接获取读锁呢?
- 线程1持有读锁,此时线程1又想要去获取写锁,请问在此场景下锁升级操作是否被允许发生呢?
- 线程1持有写锁,此时线程1又想要去获取读锁,请问在此场景下锁降级操作是否被允许发生呢?
这里我先回答第2和第3个问题:
- 读锁不能直接升级为写锁,而写锁可以降级为读锁
- 为什么读锁不能升级为写锁:
- 当多个线程同时持有读锁时,它们可以并发地读取数据,因为读操作不会影响数据的一致性。
- 然而,如果读锁能够直接升级为写锁,那么问题就会变得复杂。如果一个线程在持有读锁的情况下升级为写锁,那么其他线程可能正在读取数据,升级为写锁后,其他线程可能会在不知情的情况下访问到无效或不一致的数据,从而破坏了读锁的目的。因此,为了保证数据的一致性,一般情况下,读锁不能直接升级为写锁。
- 为什么写锁可以降级为读锁:
- 写锁可以降级为读锁是因为写锁保证了数据的一致性,而降级为读锁不会引起数据一致性的问题。当一个线程持有写锁时,它是独占访问资源的,其他线程无法读取或写入数据。如果此时没有其他写操作需要进行,写锁可以降级为读锁,允许其他线程并发地读取数据,因为读操作不会影响数据的一致性。
关于第一个问题的答案,我们将在接下来的代码解析环节揭晓。
获取写锁
我们首先来看读写锁实现中获取写锁的方法实现:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
AQS提供的acquire模版方法就不多讲了,大家可以自行回忆整个过程,其中我们来看看读写锁是如何判断写锁是否获取成功的这个过程:
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();// state的前半部分记录读锁数量,后半部分记录写锁数量int c = getState();int w = exclusiveCount(c);// 如果存在读锁或者写锁,c!=0的条件都会满足if (c != 0) {// 只有存在写锁并且是当前线程持有的写锁,下面这个条件才不会满足// 否则说明当前写锁获取失败if (w == 0 || current != getExclusiveOwnerThread())return false;// 如果是当前线程自己的写锁重入,则判断是否重入次数过多 if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// Reentrant acquire// 累加重入次数setState(c + acquires);return true;}// 此时不存在任何锁,那么为什么还要判断当前写锁是否应该阻塞呢?// 因为如果多个线程同时执行到当前逻辑,那么可能已经有线程已经抢到锁了,然后某几个线程已经入队等候了// 当前线程执行到此处时发生上下文切换,再次切换回来执行时,正好碰上锁被释放// 判断逻辑: 当前是公平还是非公平锁,如果是公平锁,则看是否有人排队等候锁,否则该函数永远返回falseif (writerShouldBlock() ||// cas尝试去获取锁,如果失败了则返回false!compareAndSetState(c, c + acquires))return false;// 抢锁成功,设置锁的占有线程为当前线程 setExclusiveOwnerThread(current);return true;}
如果当前存在锁,并且不属于自身写锁可重入场景,则表明抢锁失败,当前线程需要入队阻塞。
如果不存在锁,则CAS尝试抢锁,失败了还是入队阻塞。
关于判断写锁是否应该阻塞的公平和非公平实现如下:
static final class NonfairSync extends Sync {final boolean writerShouldBlock() {return false; // writers can always barge}...}static final class FairSync extends Sync {final boolean writerShouldBlock() {return hasQueuedPredecessors();}...}
释放写锁
读写锁实现中释放写锁的代码如下所示:
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
AQS提供的release模版方法这里不再多说,下面来看看tryRelease方法是如何完成写锁的释放流程的:
protected final boolean tryRelease(int releases) {// 要确保持有锁的线程是自己if (!isHeldExclusively())throw new IllegalMonitorStateException();int nextc = getState() - releases;// 判断写锁计数递减后是否为0boolean free = exclusiveCount(nextc) == 0;// 不为0说明存在锁重入if (free)setExclusiveOwnerThread(null);setState(nextc);return free;}
获取读锁
读写锁实现中获取读锁的代码如下所示:
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}
AQS提供的acquireShared模版方法这里不再多说,下面来看看tryAcquireShared方法是如何完成读锁的获取流程的:
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();// 考虑读锁升级写锁场景: 如果当前存在写锁并且不是当前线程只有的写锁,那么获取读锁失败if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 获取读锁个数 int r = sharedCount(c);// 如果当前读操作需要被阻塞,那么需要到fullTryAcquireShared函数中检查是否真的需要阻塞if (!readerShouldBlock() &&// 确保读锁数量没有达到上限r < MAX_COUNT &&// 尝试累加全局读锁计数compareAndSetState(c, c + SHARED_UNIT)) {// 首个获取读锁的线程单独记录if (r == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {// 其他获取读锁的线程使用ThreadLocal记录其获取的读锁数量HoldCounter rh = cachedHoldCounter;if (rh == null ||rh.tid != LockSupport.getThreadId(current))// cachedHoldCounter记录最新一次获取读锁的线程的HoldCountercachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}// 这里存在三种情况会进入下面这个方法的逻辑:// 1. readerShouldBlock方法判断当前读锁获取应该阻塞等待// 2. 读锁获取数量达到最大值// 3. 尝试原子性累加全局读锁计数失败return fullTryAcquireShared(current);}
fullTryAcquireShared方法负责处理以下几种场景:
- 如果发现写锁不为空,要确保持有写锁的线程始终是当前线程自己,否则获取读锁失败,返回 - 1 ,表示当前线程需要阻塞
- readerShouldBlock()方法返回true时,表示当前线程需要入队阻塞,但是如果当前线程已经持有写锁或读锁,它仍然可以继续执行,如果当前线程不持有锁,那么返回 -1 ,表示当前线程需要阻塞
- 检查读锁获取的总次数是否超过最大限制
- cas原子性的累加读锁计数器,如果失败,则重复以上检查过程,最后再次尝试cas原子累加
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null;for (;;) {//===================阶段1: 前置检查 =======================int c = getState();// 如果发现写锁不为空,要确保持有写锁的线程始终是当前线程自己,否则获取读锁失败if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1;// 判断当前线程是否需要入队等待} else if (readerShouldBlock()) {// 如果当前线程还没有持有读锁,那么情况当前线程的threadLocal本地缓存中记录的读取计数器if (firstReader != current) {if (rh == null) {rh = cachedHoldCounter;if (rh == null ||rh.tid != LockSupport.getThreadId(current)) {rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}// 如果当前线程不持有任何锁,则返回-1,表示当前线程需要入队阻塞// 如果当前线程持有读锁,那么即便readerShouldBlock返回true,这里也不会阻塞// 为了防止死锁的发生if (rh.count == 0)return -1;}}// 如果读锁获取的总次数达到最大限制,则抛出异常if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");//===================阶段2: 累加计数器=======================// 累加全局读锁计数 if (compareAndSetState(c, c + SHARED_UNIT)) {// 说明当前线程是第一个获取读锁的线程if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {// 处理获取读锁的首位线程的读锁重入firstReaderHoldCount++;} else {// 普通情况处理if (rh == null)// 先获取最近一次获取读锁的线程的HoldCounterrh = cachedHoldCounter;// 如果最近一次获取读锁的线程记录不存在,或者其记录的不是当前线程 if (rh == null ||rh.tid != LockSupport.getThreadId(current))// 从线程自己的本地缓存获取自己的HoldCounterrh = readHolds.get();// 如果最近一次获取读锁的线程就是当前线程,那么累加当前线程本地读锁计数器 else if (rh.count == 0)readHolds.set(rh);rh.count++;// 记录最近一次获取读锁的线程的读锁计数器cachedHoldCounter = rh; // cache for release}// 返回1表示获取读锁成功return 1;}}}
fullTryAcquireShared方法大体分为两个阶段,前置检查和累加计数,其中前置检查阶段需要特别注意一点:
- 即使 readerShouldBlock 返回 true,如果当前线程已经持有写锁或读锁,它仍然可以继续执行
- readerShouldBlock()那段if…else中没有检查写锁是否为0,是因为写锁的检查已经在第一个if块中进行了检查
- 这种设计的主要目的是避免因为同一个线程在持有锁时被阻塞,导致整个程序出现死锁的情况。如果允许同一个线程在持有锁时被阻塞,那么在某些情况下,可能会导致线程间的循环依赖,从而引发死锁。
读锁是否应该阻塞
关于读锁是否应该阻塞这个问题,其公平版本实现还是判断等待队列中是否存在线程正在等待,这一点和之前一样,关键是其非公平版本实现,判断逻辑:
- 如果等待队列头结点的后继节点是独占模式,也就是当前阻塞线程期望获取的是写锁,那么当前读锁需要阻塞
- 否则,当前读锁无需阻塞
static final class NonfairSync extends Sync {final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();}}static final class FairSync extends Sync {final boolean readerShouldBlock() {return hasQueuedPredecessors();}}final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;return (h = head) != null &&(s = h.next) != null &&!s.isShared() &&s.thread != null;}
关于读锁的非公平版本实现主要是考虑到写锁的饥饿性问题:
- Writer Starvation(写线程饥饿):这是指在一个读写锁的多线程环境中,由于读线程频繁获取锁,写线程可能无法获取到写锁,从而长时间被阻塞。这可能导致写线程无法完成关键的操作,从而影响程序的性能和响应性。
- 启发式方法:为了避免写线程饥饿,这段代码注释中提到了一种启发式方法。在这个方法中,当一个线程出现在队列的队首(appears to be head of queue),且如果这个线程是一个正在等待写锁的线程(waiting writer),则当前线程会被阻塞。
- 概率效应:需要注意的是,这只是一个概率性的效应。这是因为即使出现一个等待写锁的线程在队首,一个新的读线程不会被阻塞,如果在它之前有其他的启用的读线程还没有从队列中出来。这是为了在一定程度上平衡读写线程的获取锁的机会,避免写线程长时间被阻塞。
大家可以回想一下本篇开始提出的问题一,相信大家已经都知晓答案了。
释放读锁
读写锁实现中释放读锁的代码如下所示:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
AQS提供的releaseShared模版方法这里不再多说,下面来看看tryReleaseShared方法是如何完成读锁的释放流程的:
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();// 如果当前线程是第一个获取读锁的线程if (firstReader == current) {// assert firstReaderHoldCount > 0;// 如果读锁没有出现可重入获取,直接将firstReader置空if (firstReaderHoldCount == 1)firstReader = null;else// 递减读锁获取次数,等到为0时,才算读锁被真正释放firstReaderHoldCount--;} else {// 通过缓存拿到最近一次获取读锁的线程的HoldCounter计数HoldCounter rh = cachedHoldCounter;// 如果缓存中没有,或者缓存记录的不是当前线程的读计数器,那么从当前线程本地缓存中获取if (rh == null ||rh.tid != LockSupport.getThreadId(current))rh = readHolds.get();// 如果读锁获取计数器=1,则直接释放当前读锁int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}// 否则递减计数器--rh.count;}// 通过cas加重试确保全局计数器递减成功for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.// nextc == 0 条件满足说明读锁和写锁计数器都为0return nextc == 0;}}
小总结
关于混合模式的实现,其本身结合使用了互斥模式和共享模式,因此锁队列中既存在类型为独占模式的Node,也存在类型为共享模式的Node,如下图所示:
- 获取写锁时,如果当前存在锁,并且不属于自身写锁可重入场景,则抢锁失败,当前线程入队阻塞,Node节点类型为独占模式;如果不存在锁,则CAS尝试抢锁,失败了还是入队阻塞。
- 释放写锁时,判断锁计数递减后是否为0,如果是则说明锁释放成功,然后唤醒当前头节点的后继节点,否则跳过唤醒阶段
- 获取读锁时,分为以下几类场景:
- 存在写锁,但是写锁不是自己的,则获取读锁失败,返回 -1 ,当前线程需要阻塞
- 存在写锁, 写锁时自己的,此时属于写锁降级为读锁的场景,累加读锁计数,然后返回1,表示获取读锁成功
- 不存在写锁,此时readerShouldBlock方法返回true,表示当前线程需要入队阻塞,但是如果当前线程此时已经持有读锁,它仍然可以继续执行,否则返回-1,表示当前线程需要阻塞
- 如果读锁获取的总次数超过最大限制,抛出异常
- cas原子性的累加读锁计数器,如果失败,则重复以上检查过程,如果没问题,继续尝试cas原子累加计数器
- 释放读锁时,先递减当前线程本地缓存的计数器值,然后再递减全局锁计数器的值,只有当全局锁计数器递减后的值为0时,才表明当前锁时真正被完全释放掉了
之前在共享模式篇说过,共享锁的释放流程走的是链式唤醒流程,而在混合模式下,如果链式唤醒过程中遇到了独占节点,也会提前终止链式唤醒流程:
private void setHeadAndPropagate(Node node, int propagate) {Node h = head;setHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;// 唤醒的前提是后继节点必须是共享节点,而不是独占节点if (s == null || s.isShared())doReleaseShared();}}
栅栏模式
当AQS处在栅栏模式下时,此时state表示剩余未到达栅栏处的线程数,先到达栅栏处的线程需要等待直到剩余线程都达到栅栏 , 当state=0时,说明所有线程都到达栅栏处,此时打开栅栏,即唤醒所有线程继续执行
当AQS运行在栅栏模式下时,此时Node节点的结构如下所示:
static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED = 1;static final int SIGNAL = -1;static final int PROPAGATE = -3;volatile Node prev;volatile Node next;volatile int waitStatus;// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用// 所以这里改名方便下面理解Node shared;final boolean isShared() {return shared == SHARED;}// 省去栅栏模式下使用不到的相关属性...}
栅栏模式这里以CountDownLatch为例进行说明,CountDownLatch的构造函数需要传入一个count值表明参与本轮任务的线程数量:
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}
严格来说CyclicBarrier才是栅栏模式的真正实现,CountDownLatch只是栅栏模式的简化版本,和栅栏模式稍有区别在于,CountDownLatch只有一轮循环,也就是所有线程到达栅栏处后,就结束所有线程的执行;而CyclicBarrier会在本轮循环结束后,重置计数器,开启下一轮循环。
等待
主线程调用await方法等待所有线程完成任务到达栅栏处:
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
acquireSharedInterruptibly这个模版方法就不多说了,tryAcquireShared方法实现也很简单,当state递减为0时,表明所有线程都已经到达栅栏处了,本轮结束,主线程可以继续往下执行了:
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
递减计数
当前线程完成自己的任务后,递减计数:
public void countDown() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
releaseShared模版方法这里就不多说了,tryReleaseShared的实现也很简单,通过cas配合重试递减计数器的值,如果计数器的值被递减到0了,说明当前线程是最后一个到达栅栏处的,接下来就可以唤醒等待中的主线程了:
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c - 1;if (compareAndSetState(c, nextc))return nextc == 0;}}
条件变量模式
当AQS运行在条件变量模式下时,此时Node节点的结构如下所示:
static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED = 1;static final int SIGNAL = -1;static final int PROPAGATE = -3;static final int CONDITION = -2;volatile Node prev;volatile Node next;volatile int waitStatus;// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用// 所以这里改名方便下面理解Node shared;final boolean isShared() {return shared == SHARED;}// 如果当前Node处在条件队列中,那么该属性链接当前节点在队列中的下一个节点Node nextWaiter;// 省去共享模式下使用不到的相关属性...}
条件变量模式大家需要注意两点:
- 条件变量需要互斥锁保护
- 此模式下存在两个队列,一个是锁队列,一个是条件队列,如果存在多个条件变量,那么就会有多个条件队列
这两套队列复用一套Node结构
此处还未Node节点新增一个状态CONDITION,此状态用于描述处于条件队列中的节点。
这里以ReentrantLock中的ConditionObject实现来看看条件变量在AQS中是怎么玩的。
等待条件成立
await方法为当前线程创建node节点,并加入对应的条件队列中等待,直到被signal唤醒或者中断唤醒才会结束等待状态:
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 为当前线程创建一个Node节点,然后尾插到条件队列尾部 Node node = addConditionWaiter();// 因为当前线程需要在条件队列中挂起等待,所以挂起前需要完全释放掉当前线程持有的锁// 返回值是阻塞前持有的锁计数int savedState = fullyRelease(node);int interruptMode = 0;// 当条件成立时,会将当前node节点从条件队列移入锁队列while (!isOnSyncQueue(node)) {// 挂起当前线程,等待条件成立LockSupport.park(this);// 被唤醒后检查是否是被中断唤醒的,如果是被中断唤醒的并且转移节点到锁队列成功,返回-1,并跳出循环等待// 如果是被中断唤醒的,但是cas转移节点到锁队列失败了,此时返回1,并跳出循环等待if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 当前节点已经转移到锁队尾排队等候锁了,此时调用acquireQueued把当前线程重新挂起// 等待被唤醒后抢锁,如果失败,继续挂起,直到抢锁成功为止// acquireQueued方法返回值表示等待锁期间是否发生了中断,如果发生了则记录中断标记为REINTERRUPT// 如果阻塞前线程发生了可重入,并重入了3次,那么此处抢锁要确保初始锁计数为3if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 条件满足,当前线程被唤醒,并且唤醒后抢锁成功// 先清理一下队列中处于取消等待状态的节点 if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();// 在条件队列上等待期间,或者锁队列上等待期间发生了中断,则对中断进行分类处理if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
addConditionWaiter方法负责为当前线程创建一个Node节点,然后尾插到条件队列尾部:
private Node addConditionWaiter() {// 确保当前线程持有保护当前条件变量的互斥锁if (!isHeldExclusively())throw new IllegalMonitorStateException();// 清理一下条件队列中处于取消状态的Node节点Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {// 负责遍历整个条件队列链表,删除掉所有处于CANCEL状态的节点unlinkCancelledWaiters();t = lastWaiter;}// 为即将入队的当前线程创建一个Node节点尾插到当前条件队列中Node node = new Node(Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
fullyRelease方法负责完成释放掉当前线程持有的所有锁:
final int fullyRelease(Node node) {try {// 返回先前持有的锁计数int savedState = getState();// 释放掉当前线程持有的锁,如果是可重入场景,也是全部释放掉if (release(savedState))return savedState;throw new IllegalMonitorStateException();} catch (Throwable t) {node.waitStatus = Node.CANCELLED;throw t;}}
checkInterruptWhileWaiting方法在被唤醒后检查是否是被中断唤醒的,如果是说明条件满足被signal唤醒了,然后调用transferAfterCancelledWait将当前线程从条件队列转移到锁队列等候:
private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;}final boolean transferAfterCancelledWait(Node node) {// 将当前节点状态值设置为0if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {// 放入到锁队列尾部enq(node);// 此处返回true,表示是由于中断唤醒导致当前线程由条件队列转移到了锁队列中return true;}/** If we lost out to a signal(), then we can't proceed* until it finishes its enq(). Cancelling during an* incomplete transfer is both rare and transient, so just* spin.*/// 如果上面cas失败,说明SIGNAL操作和中断同时发生,此时被中断唤醒的线程会不断轮询直到SIGNAL操作将节点成功转移到锁队列while (!isOnSyncQueue(node))Thread.yield();// 此处返回false,此时虽然发生了中断,但是最终不是因为中断返回被唤醒,而是signal操作率先完成了唤醒return false;}
reportInterruptAfterWait方法负责判断当前是应该发生一次自我打断还是应该直接抛出中断异常:
private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();}
条件满足,唤醒等待的节点
当条件满足时,调用signal方法唤醒条件队列中第一个节点对应的线程:
public final void signal() {// 确保当前线程持有保护条件变量的互斥锁if (!isHeldExclusively())throw new IllegalMonitorStateException();// 唤醒条件队列中等待的所有线程Node first = firstWaiter;if (first != null)doSignal(first);}
doSignal方法负责遍历条件队列中每个节点,每遍历到一个节点先检查是否是被CANCEL了的节点,如果是则跳过,继续遍历下一个节点,如果不是则将当前节点从条件队列转移到锁队列中,并唤醒对应的线程,然后结束返回:
private void doSignal(Node first) {do {// 当前节点移出条件队列if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}
transferForSignal方法负责将当前节点加入锁队列中,同时恢复挂起了的线程:
final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/// 如果cas失败,说明当前Node已经被取消了if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/// 将当前节点加入锁队列尾部--返回旧的尾节点Node p = enq(node);int ws = p.waitStatus;// 将旧的尾节点的状态值设置为SIGNAL,因为其存在新的后继节点需要唤醒if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))// 恢复挂起的线程LockSupport.unpark(node.thread);return true;}
小总结
条件变量的实现这块,需要将await和signal两部分连起来看才能看明白,这里我给出一副流程图来帮助大家理解等待唤醒过程,首先来看一下await的过程:
虽然await方法有很多代码,但是大部分代码都服务于唤醒后的处理操作。
下面来看一下signal的整个过程:
后续的流程就是互斥模式下阻塞等待锁的流程了,当t3线程抢到锁后,会判断在锁队列上阻塞等待期间是否发生了中断,如果是则会补充一次自我中断。
上面描述的是通过signal唤醒等待中的线程,还有一种是线程阻塞过程中被打断了,此时可能会发生如下两种情况:
- 线程被中断唤醒后,尝试CAS更新节点状态从CONDITION到0成功,此时interruptMode会被设置THROW_IE标记
当后续线程成功获取到锁被唤醒后,发现interruptMode被设置了THROW_IE标记,会抛出中断异常,表明此次返回并非条件为真返回,而是因为中断唤醒返回。
- 线程被中断唤醒后,尝试CAS更新节点状态从CONDITION到0失败,说明SIGNAL操作和中断同时发生,此时被中断唤醒的线程会不断轮询直到SIGNAL操作将节点成功转移到锁队列
后续的流程就是互斥模式下阻塞等待锁的流程了,当t3线程抢到锁后,会补充一次自我中断。此时虽然发生了中断,但是最终不是因为中断返回被唤醒,而是signal操作率先完成了唤醒,但是等待期间发生了中断,所以补充了一次自我中断。
小结
Java AQS抽象队列同步器的大致设计思路也就讲述的差不多了,后半部分由于写作时间比较紧张,所以可能越到后面讲的就越草率了,如果有没看懂的地方,或者发现笔者写作错误可以在评论区指出或者私信与我讨论。
相关文章:

Java的AQS框架是如何支撑起整个并发库的
如何设计一个抽象队列同步器 引言AQS需要解决哪些场景下的问题互斥模式获取锁抢锁失败入队 释放锁小总结 共享模式获取共享资源释放共享资源唤醒丢失问题 小总结 混合模式获取写锁释放写锁获取读锁读锁是否应该阻塞 释放读锁小总结 栅栏模式等待递减计数 条件变量模式等待条件成…...

一.net core 自动化发布到docker (Jenkins安装)
目录 1.安装Jenkins 参考资料:https://www.jenkins.io/doc/book/installing/docker/#downloading-and-running-jenkins-in-docker 1.Open up a terminal window.(打开一个终端窗口。) 2.Create a bridge network in Docker using the following docker network create comma…...

二刷LeetCode--148. 排序链表(C++版本),必会题,思维题
思路,本题其实考察了两个点:合并链表、链表切分。首先从1开始,将链表切成一段一段,因为需要使用归并,所以下一次的切分长度应该是当前切分长度的二倍,每次切分,我们拿出两段,然后将第…...

css flex 上下结构布局
display: flex; flex-flow: column; justify-content: space-between;...

win下qwidget全屏弹窗后其他窗口鼠标样式无法更新的问题
在win平台下,实现截取选桌面执行推理功能,用一个qwidget(j对象名为m_selectWidget)来显示选取范围的边框,但这个qwidget显示后,其他窗口在他下面可以接受鼠标相应的事件,但原来的鼠标形状功能失效(mac正常&…...

Java【数据结构】二分查找
🌞 题目: 🌏在有序数组A中,查找目标值target 🌏如果找到返回索引 🌏如果找不到返回-1 算法描述解释前提给定一个内含n个元素的有序数组A,满足A0<A1<A2<<An-1,一个待查值target1设…...

数据库技术--数据库引擎,数据访问接口及其关系详解(附加形象的比喻)
目录 背景数据库引擎Jet数据库:ISAM:ODBC(Open Database Connectivity): 数据访问接口ADO(ActiveX Data Objects)DAO(Data Access Objects)RDO(Remote Data O…...

【BASH】回顾与知识点梳理(三十三)
【BASH】回顾与知识点梳理 三十三 三十三. 认识系统服务 (daemons)33.1 什么是 daemon 与服务 (service)早期 System V 的 init 管理行为中 daemon 的主要分类 (Optional)systemd 使用的 unit 分类systemd 的配置文件放置目录systemd 的 unit 类型分类说明 33.2 透过 systemctl…...

同步请求和异步请求
同步请求和异步请求是在网络编程中常用的两种通信模式,它们有以下区别: 同步请求: 在发送一个请求后,程序会一直等待服务器返回响应,期间无法进行其他操作。请求发出后,程序会阻塞在请求处,直…...

Transformer是什么,Transformer应用
目录 Transformer应用 Transformer是什么 Transformer应用:循环神经网络 语言翻译:注重语句前后顺序 RNN看中单个特征; CNN:看中特征之间时序性 模型关注不同位置的能力 Transformer是什么 Transformer是一个利用注意力机制来提高模型训练速度的模型。关于注意力机…...

故障011:dmap服务缺失libnsl.so修复
故障011:dmap服务缺失libnsl.so修复 1. 问题描述2. 解决方法2.1 初步分析2.2 动手实操2.2.1 模糊搜索大法2.2.2 僵桃代李大法 DM技术交流QQ群:940124259 1. 问题描述 今天遇二期XC环境,达梦DM 7.6的DmAPService备份辅助进程服务无法启动&a…...

第十三章 SpringBoot项目(总)
1.创建SpringBoot项目 1.1.设置编码 1.4.导入已有的spring boot项目 2.快速搭建Restfull风格的项目 2.1.返回字符串 RestController public class IndexController {RequestMapping("/demo1")public Object demo1() {System.out.println("demo1 ran...."…...

利用Python隧道爬虫ip轻松构建全局爬虫网络
嘿,爬虫程序员们!你们有没有碰到过需要大规模数据爬取的情况?也许你们之前遇到过网站的反爬措施,卡住你们的进度。别担心,今天我来分享一个利用Python隧道爬虫ip实现的方法,帮助你们轻松搭建全局爬虫ip网络…...

Spring Clould 网关 - Gateway
视频地址:微服务(SpringCloudRabbitMQDockerRedis搜索分布式) Gateway网关-网关作用介绍(P35) Spring Cloud Gateway 是 Spring Cloud 的一个全新项目,该项目是基于 Spring 5.0,Spring Boot 2…...

PHP使用phpmailer及SMTP服务实现邮件发送
博客升级中,把之前没有想到的功能一点点的完善。 这篇日志记录一下,使用phpmailer实现邮件发送的这样一个操作。 博客偶尔会有留言和评论,我也会及时回复,但是有一个问题,我回复了,给我留言的人如果不再次…...

交换实验一
题目 交换机上接口配置 SW1 interface GigabitEthernet0/0/1 port hybrid tagged vlan 2 port hybrid untagged vlan 3 to 6 interface Ethernet0/0/2 port hybrid pvid vlan 3 port hybrid untagged vlan 2 to 6 interface Ethernet0/0/3 port link-type access port d…...

计算机中丢失MSVCR120.dll,找不到MSVCR120.dll是什么意思?
当计算机中缺少MSVCR120.dll文件时,意味着缺少了Microsoft Visual C Redistributable文件的一个组件。MSVCR120.dll是Visual C Redistributable 2013的动态链接库文件,它是应用程序依赖的重要文件之一。缺少MSVCR120.dll文件可能会导致一些应用程序无法正…...

avue多选列表根据后端返回的某个值去判断是否选中;avue-curd多选回显
效果如上: getSiteList().then(res > {//列表数据this.siteData res.data.datathis.$nextTick(()>{this.siteData.forEach(item>{//业务条件if(item.configid&&item.configid!0&&item.configid>0){//符合条件时调用选中的方法this.$…...

Vue2中根据权限添加动态路由
Vue2中根据权限添加动态路由 大概记录一下主要代码 1.根据后端返回的路由列表生成左侧菜单(后端返回的数据结构中用id和pid来区别包含关系) 大概结构如下: 2.前端需要处理成包含children的树形结构 //动态生成菜单 export const gener…...

搭建 Python 环境 | Python、PyCharm
计算机 计算机能完成的工作: 算术运算逻辑判断数据存储网络通信…更多的更复杂的任务 以下这些都可以称为 “计算机”: 一台计算机主要由以下这几个重要的组件构成 CPU 中央处理器:大脑,算术运算,逻辑判断 存储器&…...

NPOI 读取和写入Excel
在C#中使用NPOI库读取和写入Excel文件,你需要先下载并安装NPOI库。你可以在NuGet管理器中搜索NPOI并进行安装。 以下是一个使用NPOI库进行Excel文件读取和写入的示例: 读取Excel文件: using NPOI.SS.UserModel; using NPOI.XSSF.UserModel…...

Linux工具【2】(调试器gdb、项目自动化构建工具make/Makefile)
gdb、make/Makefile 引言调试器gdb介绍常用指令 自动化构建工具make/Makefile介绍使用依赖关系与依赖方法编辑Makefile伪目标 总结 引言 在上一篇文章中介绍了Linux中的编辑器vim与编译器gcc与g: 戳我看vim与gcc详解哦 在本篇文章中将继续来介绍Linux中的工具&…...

C++ 网络编程项目fastDFS分布式文件系统(三)-Nginx部分
目录 1. 一些基本概念 1.1 Nginx初步认识 1.2 正向/反向代理 1.3 域名和IP 2. Nginx 安装和配置 2.1 安装 2.2 配置 3. Nginx的使用 3.1 部署静态网页 3.2 反向代理和负载均衡 4 课外知识导读 1. URL和URI 编辑 2. DNS解析过程 1. 一些基本概念 1.1 Nginx初步认…...

Apache-DBUtils
目录 封装方法 引出dbutils 案例 当关闭connection后,resultset结果集就无法使用了,这就使得resultset不利于数据的管理 封装方法 我们可以将结果集先存储在一个集合中,当connection关闭后,我们可以通过访问集合来访问结果集 …...

LangChain手记 Agent 智能体
整理并翻译自DeepLearning.AILangChain的官方课程:Agent(源代码可见) “人们有时会将LLM看作是知识库,因为它被训练所以记住了来自互联网或其他地方的海量信息,因而当你向它提问时,它可以回答你的问题。有一…...

87-基于stm32单片机粮仓仓库环境温湿度烟雾监测报警系统Proteus仿真+源码
资料编号:087 一:功能介绍: 1、采用stm32单片机OLED显示屏烟雾浓度检测DHT11温湿度电机按键蜂鸣器,制作一个温湿度采集、烟雾浓度采集,OLED显示相关数据, 2、通过按键设置温度上限、烟雾浓度上限࿰…...

ChatGPT 调教日记(二):程序员转量化的背景知识
程序员如何学习量化金融 作为一个程序员学习量化金融(quant)是一个不错的选择。以下是一些建议: 学习金融基础知识:了解金融市场、投资策略和金融产品。这将帮助你理解量化金融的背景和应用场景。 学习统计学和数学:…...

什么是网络地址转换 (NAT)
网络地址转换(NAT)是更改源和目标 IP 地址和端口的过程,地址转换减少了对 IPv4 公共地址的需求,并隐藏了专用网络地址范围,该过程通常由路由器或防火墙完成。 NAT是如何工作的 NAT 允许单个设备(如路由器…...

系统架构设计师---事务管理、并发控制、数据库的备份与恢复
目录 事务管理 定义 事务的四个特性(ACID) 相关SQL语句 并发控制...

如何更好的维护自己的电脑?
我的笔记本电脑 我使用的华硕天选3是一款游戏本,搭载了英特尔酷睿i7-12700H处理器,16GB内存,512GB固态硬盘和NVIDIA GeForce RTX 3050显卡。屏幕尺寸为15.6英寸,分辨率为2560x1440。对于日常使用和工作学习娱乐都能满足要求。 日常…...