当前位置: 首页 > article >正文

【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码

👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD

🔥 2025本人正在沉淀中… 博客更新速度++

👍 欢迎点赞、收藏、关注,跟上我的更新节奏

📚欢迎订阅专栏,专栏名《在2B工作中寻求并发是否搞错了什么》

前言

经过上一篇的学习,我们知道了。AQS的基本原理和使用。

【Java并发】【AQS】适合初学者体质的AQS入门

主播觉得,AQS的原理,就是通过这2个队列的协助,实现核心功能,同步队列(CLH队列)和条件队列(Condition队列)。

在这里插入图片描述

同步队列(CLH队列)

  • 作用:管理需要获取锁的线程。当多个线程竞争共享资源时,未获取到锁的线程会被封装成节点,按FIFO顺序加入阻塞队列,等待唤醒后重新尝试获取锁。
  • 解决的问题
    实现锁的公平性线程排队机制。通过CLH队列,AQS可以按顺序唤醒线程(如公平锁),避免线程无休止自旋竞争资源,减少CPU开销。

条件队列(Condition队列)

  • 作用:管理等待特定条件的线程。当线程调用Condition.await()时,会释放锁并进入条件队列;当其他线程调用Condition.signal()时,条件队列中的线程会被转移到阻塞队列,重新参与锁竞争。
  • 解决的问题
    实现线程间的精细化协作(如生产者-消费者模式)。例如:
    • 生产者线程在队列满时,通过条件队列挂起,而非占用锁空等。
    • 消费者线程消费数据后,通过signal()唤醒生产者,解耦等待条件与锁竞争。

下面,主播会通过ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier的源码的角度来分析。它们都是基于AQS的实现。带大家看看AQS到底有啥?

以ReentrantLock的角度看AQS独占实现

ReentrantLock的简单使用

简单看下ReentrantLock怎么使用的独占锁。

public class SimpleLockDemo {static ReentrantLock lock = new ReentrantLock(); // 1. 创建锁对象static int count = 0; // 共享资源public static void main(String[] args) throws InterruptedException {Runnable task = () -> {lock.lock();          // 2. 加锁try {count++;         // 3. 操作共享资源} finally {lock.unlock();  // 4. 解锁(必须执行)}};Thread t1 = new Thread(task);Thread t2 = new Thread(task);t1.start();t2.start();t1.join();t2.join();System.out.println("结果: " + count); // 输出 2}
}

ReentrantLock独占锁源码分析

这里多说下ReentrantLock,公平锁和非公平锁吧!

其实非公平就是多了一步,setExclusiveOwnerThread将当前线程所有者改为当前线程。

这个exclusiveOwnerThread字段,是AQS继承AbstractOwnableSynchronizer来的字段。

state字段是AQS定义的。在ReentrantLock中,这个state0就是没有线程获锁,1就是有线程获取到锁。

// 非公平锁  ReentrantLock.NonfairSync
final void lock() {// 尝试获取锁,将state由0改1if (compareAndSetState(0, 1))// 把锁的线程的所有者exclusiveOwnerThread字段,设置为当前线程setExclusiveOwnerThread(Thread.currentThread()); elseacquire(1);	// 抢锁失败,放入阻塞队列
}// AbstractOwnableSynchronizer#setExclusiveOwnerThread
protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;
}// 公平锁  ReentrantLock.FairSync
final void lock() {acquire(1);
}

下面是AQS的独占锁具体逻辑:

在这里插入图片描述

首先是执行子类(ReentrantLock)的实现:

  1. tryAcquire方法,尝试再获取锁1次。
  2. addWaiter将当前线程封装为Node,加入CLH队列。
  3. acquireQueued将线程挂起。
public final void acquire(int arg) {if (!tryAcquire(arg) && 	// 1.tryAcquire调用子类实现acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // AQS实现,加入同步队列等待selfInterrupt();		// 线程中断复位
}

tryAcquire

tryAcquire方法,由于子类具体实现,下面是公平锁的实现源码:

// 不公平的实现
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}// nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {// 获取当前线程final Thread current = Thread.currentThread();// 获取当前状态,ReentrantLock中 0是锁没有被抢占,1是已经被其他线程抢占了int c = getState();// 如果锁没有被抢占if (c == 0) {// cas尝试抢占,state由0改到1if (compareAndSetState(0, acquires)) {// 把锁的线程的所有者exclusiveOwnerThread字段,设置为当前线程setExclusiveOwnerThread(current);return true;}}// 可重入锁的情况,如果持有锁的线程为当前线程else if (current == getExclusiveOwnerThread()) { // 线程重入数量+1,ReentrantLock独占,这里的acquires就是1int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");// 修改state的状态setState(nextc);return true;}return false;
}

公平锁的实现,和非公平不一样的地方在hasQueuedPredecessors方法这里,hasQueuedPredecessors方法的作用是判断当前线程是否排队等待获取锁

🍪这里我们不展开说,主播把大家当初学者来看,现在你并不知道同步队列的结构,主播会在下面的Semaphore源码分析的时候,再说这个东西。你现在只需要知道这个方法是用来判断当前线程是否排队等待获取锁

protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 当前线程不需要等待获取锁 且 cas获取锁成功if (!hasQueuedPredecessors() &&		compareAndSetState(0, acquires)) {// 把锁的线程的所有者exclusiveOwnerThread字段,设置为当前线程setExclusiveOwnerThread(current);return true;
...

addWaiter

创建一个独占的Node,并将它放入同步队列中。

在这里插入图片描述

// 独占模式这里是mode是Node.EXCLUSIVE。值为null
private Node addWaiter(Node mode) {// 创建1个独占模式的 Node。在同步队列中nextWaiter字段用来区分是独占还是共享模式。// waitStatus初始值就是0,Node.EXCLUSIVE值为nullNode node = new Node(Thread.currentThread(), mode);// pred赋值为当前同步队列的tailNode pred = tail;// 如果当前同步队列有tail(就是已经构建过同步队列了)if (pred != null) {// 当前要加入同步队列Node的前序,指向同步队列的尾部node.prev = pred;// cas将同步队列的tail赋值为当前要加入的Nodeif (compareAndSetTail(pred, node)) {// 同步队列的tail的下一个Node赋值为当前要加入的Nodepred.next = node;return node;}}// 没有构建过同步队列,node入队enq(node);return node;
}Node(Thread thread, Node mode) {     // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;
}

enq方法

private Node enq(final Node node) {// cas修改for (;;) {// 临时Node赋值为tail节点Node t = tail;// 当前同步队列没有tail节点(说明没有初始化过)if (t == null) { // cas将head节点设置为新创建的节点(注意,这里是new的Node,不是入参的Node)if (compareAndSetHead(new Node()))tail = head;	// 将tail赋值为head} else {// ====== 下面是同步队列初始化过的逻辑 ========// 要加入同步队列的node的prev设置为tail节点node.prev = t;// cas将tail节点,由t设置为要加入同步队列的nodeif (compareAndSetTail(t, node)) {// tail节点的下一个节点赋值为当前要加入node的节点t.next = node;return t;}}}
}

acquireQueued

在这里插入图片描述

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 获取当前node的前一个nodefinal Node p = node.predecessor();// 前一个node为头节点 且 再次尝试获取1次成功if (p == head && tryAcquire(arg)) {// 当前要加入的节点设置为头节点setHead(node);// 前一个node的next设置为null(方便当前node的前驱被gc回收)p.next = null; failed = false;return interrupted;}// 抢占失败,判断是否要将线程挂起if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);	// 因为抢占异常,将等待状态设置为CANCELLED}
}// setHead方法
private void setHead(Node node) {// 同步队列头节点设置为当前要加入的nodehead = node;// 当前要加入的节点的线程,设置为nullnode.thread = null;// 当前要加入的节点的前驱,设置为nullnode.prev = null;
}

下面是判断是否要将线程挂起shouldParkAfterFailedAcquire,和具体挂起线程parkAndCheckInterrupt的代码方法。

// 判断是否要将线程挂起
// 入参:
// - pred:当前要加入的CLH队列节点的前驱节点,下面会称为前驱节点
// - node:当前要加入的CLH队列的节点,下面会称为当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 前驱节点的等待状态int ws = pred.waitStatus;// 判断当前节点,是否为Node.SIGNAL(值为-1)// Node.SIGNAL表示,pred节点释放后,会通知node,当前线程可以安心的挂起。if (ws == Node.SIGNAL)return true;// 当前节点的状态为CANCELLED(值为1),说明前驱节点已因超时/中断被取消if (ws > 0) {// 前驱节点向前找,将当前节点的前驱设置为,不为CANCELLED状态的节点。do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);// 前驱节点的后继节点设置为当前节点。pred.next = node;} else {// cas将前驱节点的等待状态设置为Node.SIGNAL。compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}// 中断线程
private final boolean parkAndCheckInterrupt() {// 将当前线程挂起LockSupport.park(this);// 唤醒后执行下面的代码return Thread.interrupted();
}

什么情况下,节点的等待状态会变成CANCELLED呢?

  1. 线程被中断:当线程在acquire过程中被中断(调用Thread.interrupt()),会触发cancelAcquire方法将节点状态设为CANCELLED
  2. 超时未获锁:在doAcquireNanos等带超时的获取方法中,若超时仍未获得锁,会通过cancelAcquire标记为取消
  3. 节点失效处理:在shouldParkAfterFailedAcquire中,若发现前驱节点是CANCELLED状态,会主动跳过这些失效节点

ReentrantLock释放资源分析

ReentrantLock释放资源的入口unlock方法,调用

// ReentrantLock#unlock
public void unlock() {sync.release(1);
}// AQS#release
public final boolean release(int arg) {// 调用ReentrantLock.Sync子类实现的tryRelease方法if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}

因为这篇是AQS源码阅读,这里我们简单看ReentrantLock如何tryRelease方法的

// ReentrantLock.Sync#tryRelease
// 入参 releases = 1
protected final boolean tryRelease(int releases) {// 这c,不一定是0,因为锁是可重入的,每次重入state+1int c = getState() - releases;// 只有持有锁的线程,才能释放锁if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();// free表示是否是否成功,只有c=0的时候才算释放成功boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}

所以有lock一定要有对应的unlock来减少state数量,不然就线程安全问题了💀。下面是可重入锁,但是没有unlock释放锁,导致线程获取不到锁。

public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();for (int i = 0; i < 5; i++) {new Thread(() -> {lock.lock();try {System.out.println(Thread.currentThread().getName() + ",获取到锁了。");Thread.sleep(1000);lock.lock();System.out.println("重入获取锁");// 缺少unlock} catch (Exception e) {throw new RuntimeException(e);} finally {lock.unlock();}}).start();}
}// 输出结果
Thread-0,获取到锁了。
重入获取锁

好了,主播好像又说了一堆别的内容,现在继续来说说AQS那块是怎么实现释放资源的吧 !

省流版就是:

  1. 获取同步队列中的头节点。
  2. 如果有头节点且等待状态不为0,就唤醒头结点后续的节点。
public final boolean release(int arg) {if (tryRelease(arg)) {// 获取同步队列的head节点Node h = head;// 同步队列的头节点不为空且头节节点等待状态不为0if (h != null && h.waitStatus != 0)unparkSuccessor(h);	// 唤醒head节点return true;}return false;
}

unparkSuccessor做了什么呢?省流来咯:

  1. CAS将头节点的等待状态改为0。
  2. 唤醒同步队列中最先进入不为CANCELLED的节点。
// 这里的入参node是头节点,为了方便理解下面都说头节点。
private void unparkSuccessor(Node node) {int ws = node.waitStatus;// 将头节点的等待状态设置0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 获取头节点的后继节点Node s = node.next;// 头节点没有后继节点 或 头节点的等待状态大于0(CANCELLED)// 那就从尾节点向前,不断找不最前面,不为CANCELLED的节点,并赋值给sif (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// s不空就唤醒s节点的线程if (s != null)LockSupport.unpark(s.thread);
}

以为Semaphore的角度看AQS共享锁实现

Semaphore的简单使用

Semaphore可以用来控制资源并发访问数量,可以用来做限流,下面的代码例子,我们限制每次只能由2个线程来获取共享资源。

public static void main(String[] args) {Semaphore semaphore = new Semaphore(2);for (int i = 0; i < 5; i++) {new Thread(() -> {try {// 访问共享资源semaphore.acquire();// 模拟执行业务时间2sThread.sleep(2000);System.out.println("Thread " + Thread.currentThread().getName() + " 获取到共享资源");} catch (InterruptedException e) {throw new RuntimeException(e);} finally {semaphore.release();}}).start();}
}

输出结果

Thread Thread-0 获取到共享资源
Thread Thread-1 获取到共享资源
Thread Thread-2 获取到共享资源
Thread Thread-4 获取到共享资源
Thread Thread-3 获取到共享资源

Semaphore获取共享锁的源码分析

Semaphore也是有分为公平和非公平的说法的。

// Semaphore#acquire
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)		// 	调用子类实现doAcquireSharedInterruptibly(arg);
}tryAcquireShared,让我们看看Semaphore子类是怎么实现的,Semaphore也有非公平和公平的说法。public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

公平和非公平的区别在,公平的会先判断,当前线程是否需要排队。

我们先来看看公平的实现:

// Semaphore.NonfairSync#tryAcquireShared 
protected int tryAcquireShared(int acquires) {for (;;) {// 判断当前线程是否需要排队。if (hasQueuedPredecessors())return -1;// state在这里表示共享资源的可占有数量int available = getState();// 减去本次想要占有的数量int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;    // 返回的remaining要是小于0就是抢共享资源失败的意思}
}

之前主播在ReentrantLock说的,这里要详细说说这个hasQueuedPredecessors方法

现在来具体说下这个hasQueuedPredecessors方法,判断当前线程是否需要排队。

public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;// 具体下面的判断return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}

步骤1(判断头节点和尾节点是否不同?): h != t

目的是为了快速判断当前同步队列是否为空,如果头节点和尾节点相同,说明同步队列为空,如下图所示,所以当前线程是不需要排队的。

在这里插入图片描述

步骤2(头节点的下一个节点 s 是否存在?): (s = h.next) == null

在并发场景中,可能有其他线程正在入队(比如刚设置完 tail,但还未更新 head.next),导致 h.next 暂时为 null。如下图所示。这时,认为存在并发竞争,保守判定为需要排队

在这里插入图片描述

步骤3(队列中第一个有效等待线程(s.thread)是否是当前线程?)s.thread != Thread.currentThread()

如果是当前线程 ,说明自己是队列中的第一个等待者 ,不用排队(返回 false)。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

如果不是 ,其他线程更早排队(下图thread0更早) , 必须排队(返回 true)。

在这里插入图片描述

非公平的其实很公平的差不多,少了个判断是需要排队。

// Semaphore.FairSync#tryAcquireShared
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}// Semaphore.Sync#nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) {for (;;) {// state在这里表示共享资源的可占有数量int available = getState();// 减去本次想要占有的数量int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining; // 返回的remaining要是小于0就是抢共享资源失败的意思}
}

好了,现在回到我们AQS的获取共享资源的代码里来吧!

// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)		doAcquireSharedInterruptibly(arg);	// AQS的具体获取共享资源
}

doAcquireSharedInterruptibly 方法

nNIMnhkRjz5F90tQ%253D&pos_id=img-HkcWOFLz-1745232961337)

// AQS#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 创建一个共享的Node,加入同步队列中(这里的addWaiter和ReentrantLock是一样的流程)// static final Node SHARED = new Node();final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();// 如果当前节点的前驱节点是头节点if (p == head) {// 尝试获取锁,返回的r是,后面方法的入参propagate,意思是还有多个共享资源可以占有int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r); // 将当前节点设置为头节点,并试试唤醒后继节点p.next = null; failed = false;return;}}// 获取锁失败。找到等待状态为Node.SIGNAL的前继节点 + 挂起线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}

setHeadAndPropagate方法,做了2件事:

  1. 设置当前节点为头节点
  2. 尝试唤醒后继节点
private void setHeadAndPropagate(Node node, int propagate) {// 备份head节点Node h = head;// 将当前节点设置为头节点setHead(node);// 尝试唤醒后继节点// 1.propagate > 0: 表示当前有剩余资源(如Semaphore的许可),可以继续唤醒后续线程。// 2.h == null: h 是旧的头部节点,若为 null 说明队列异常(实际极少发生)// 3.h.waitStatus < 0:表示旧头部节点处于需唤醒后续节点的状态(如 SIGNAL)。// 4.(h = head) == null || h.waitStatus < 0:重新获取下head的值再试一次if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;// 当前唤醒节点的后继节点为null或者是共享类型的Node,就唤醒后继节点。if (s == null || s.isShared())doReleaseShared();		// 具体的释放资源逻辑,下面会说}
}// setHead方法
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;
}

Semaphore释放共享锁的源码分析

当我们调用release方法的时候,会释放共享资源。

// Semaphore#release
public void release() {sync.releaseShared(1);
}// AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {// 子类实现尝试释放资源if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}

让我先看看子类Semaphore是怎么实现释放资源的:

简单来说,就是CAS的,修改state。

protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}

好了,我们看看AQS那部分是怎么做的释放共享资源,跟上主播的节奏,来到doReleaseShared方法:

private void doReleaseShared() {for (;;) {Node h = head;// 当前同步队列不为空,不为刚刚初始化完。if (h != null && h != tail) {int ws = h.waitStatus;// 头节点的状态为Node.SIGNAL,就唤醒这个线程。并将当前头节点的waitState改为0。if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            unparkSuccessor(h);}// 处理头节点处于0状态的情况,确保在并发释放时唤醒信号能正确传播。// 你一定会好奇这个ws == 0是怎么来的?// 线程A将头节点状态从SIGNAL改为0并唤醒线程B。// 线程B获取资源后成为新头节点,此时线程C进入doReleaseShared(),发现新头节点状态为0else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                }// 如果head节点发生改变,说明存在竞争就需要重新判断唤醒。没有变的话就结束。if (h == head)                   break;}
}

以CutDownLunch的角度看AQS共享锁的实现

CutDownLunch的简单使用

让我简单看看CountDownLatch的使用,通过一个计数器实现线程等待,适用于“主线程等待子线程完成任务”或“多线程同时启动”等场景。

public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(3); // 初始化计数器为3// 创建并启动3个子线程for (int i = 0; i < 3; i++) {new Thread(() -> {try {System.out.println("子线程执行任务...");Thread.sleep(1000);latch.countDown(); // 任务完成后计数器减1} catch (InterruptedException e) {throw new RuntimeException(e);}}).start();}latch.await(); // 主线程等待所有子线程完成任务System.out.println("所有子线程已完成任务,主线程继续执行");
}

输出结果

子线程执行任务...
子线程执行任务...
子线程执行任务...
所有子线程已完成任务,主线程继续执行

CutDownLunch获取共享锁源码分析

让我们从CountDownLatch的await方法开始看起:

// CountDownLatch#await()
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}

AQS怎么获取共享资源的?

// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)			// 调用子类(CutDownLunch)tryAcquireShared实现doAcquireSharedInterruptibly(arg);	// 获取锁失败执行
}

让我们简单看看CutDownLunchtryAcquireShared方法

  • state == 0:返回 1,表示倒计时已完成,线程可以直接通过。
  • state > 0:返回 -1,表示倒计时未完成,线程需阻塞等待。
// CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}

我们看看AQS是怎么做的,其实如果上面你看了Semaphore的共享资源获取实现,你就会惊奇的发现,好像差不多哈。

// AQS#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {// 这里返回是1或者-1// - 返回1,线程可以直接通过。// - 返回-1,表示倒计时未完成,线程需阻塞等待。int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r); // 尝试唤醒后继节点p.next = null; failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}

setHeadAndPropagate

// 入参解释下
// node:当前新增的节点 propagate:这里是1
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; setHead(node);	// 将当前节点设置为头节点// 重点在这里// CountDownLatch中:propagate 表示是否已完全释放(即 state 是否减到 0)if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();	// }
}

CutDownLunch释放共享锁源码分析

我们看看countDown是怎么释放共享资源的。

// CountDownLatch#countDown
public void countDown() {sync.releaseShared(1);
}

CountDownLatch#countDown中调用AQS的releaseShared方法

public final boolean releaseShared(int arg) {// 子类实现tryReleaseShared方法获取共享资源if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}

doReleaseShared是AQS的实现,我们在上面的Semaphore讲过了。

这里具体看看子类CountDownLatch是怎么重写tryReleaseShared方法的。

protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;// 将state数量-1,state减到0就表示可以放行所有被await()阻塞的线程。int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}
}

以CyclicBarrier的角度看AQS条件队列

CyclicBarrier的简单使用

CyclicBarrier(循环屏障)用于让一组线程互相等待,直到所有线程都到达某个屏障点后,再一起继续执行。

public static void main(String[] args) {int threadCount = 3;// 创建 CyclicBarrier,指定等待的线程数和到达屏障后的回调动作CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {System.out.println("所有线程已到达屏障,开始下一阶段");});for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 开始第一阶段任务");Thread.sleep(1000);barrier.await(); // 等待其他线程到达屏障System.out.println(Thread.currentThread().getName() + " 开始第二阶段任务");Thread.sleep(1000);barrier.await(); // 再次等待System.out.println(Thread.currentThread().getName() + " 完成所有任务");} catch (Exception e) {e.printStackTrace();}}, "线程" + (i + 1)).start();}
}

CyclicBarrier循环屏障源码分析

await方法

// CyclicBarrier#await
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe);}
}

dowait方法

// CyclicBarrier#dowait
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;// 这里加锁是为了使用Condition的await(),具体原因会在ReentrantLock源码解读中说。lock.lock();try {// generation代的概念,线程数量达到构造CyclicBarrier传的parties数量,可以执行构造时传的任务,这就是1代。final Generation g = generation;// 标记屏障是否被破坏,如果屏障被破坏,其他等待线程需要立即感知到这一状态,而不是无限等待。if (g.broken)throw new BrokenBarrierException();// 出现线程被中断,主动破坏屏障(breakBarrier()),唤醒所有等待线程,避免它们无限等待if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// 需要打破屏障的线程数量--// 数量为0,就是打破屏障,执行构造CyclicBarrier传的任务。int index = --count;if (index == 0) {  boolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();	// 执行任务ranAction = true;	nextGeneration();	// 复原,准备下一代的数据return 0;} finally {if (!ranAction)breakBarrier();}}// 如果没有达到打破屏障的线程数,那就挂起这个线程for (;;) {try {// 是否启用超时机制if (!timed)trip.await();		// 释放锁 + 挂起线程else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();	// 释放锁}
}

await方法,这里会涉及到线程的挂起和锁的释放。

public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();	  // 加入到条件队列中int savedState = fullyRelease(node);  // 释放当前线程持有的锁int interruptMode = 0;// 如果不在同步队列中,那么就挂起这个线程。while (!isOnSyncQueue(node)) {LockSupport.park(this);		// 挂起线程if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}

addConditionWaiter方法,创建Node.CONDITION的Node,添加到条件队列里面。

下图模拟了条件队列的可能的添加新Node的情况。
在这里插入图片描述
在这里插入图片描述

private Node addConditionWaiter() {Node t = lastWaiter;// 有最后一个节点,且等待状态不为Node.CONDITIONif (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}

条件队列转同步队列

比如,我们在await的线程数量,达到可以打破屏障的时候,我们会执行nextGeneration方法。这时候就会涉及到条件队列转同步队列。

int index = --count;
if (index == 0) {  boolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();	ranAction = true;	nextGeneration();	// 唤醒所有线程,从条件队列进入到同步队列

nextGeneration方法,会为下一代准备数据。

// CyclicBarrier#nextGeneration
private void nextGeneration() {// 重点在这里,唤醒线程trip.signalAll();count = parties;	// 复原打破屏障的数量 generation = new Generation();	// 创建新的一代
}// AbstractQueuedSynchronizer#signalAll
public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);		// 重点在这里
}

doSignalAll,这里会逐步的将每个节点都放入,放入到同步队列种。

在这里插入图片描述

// AbstractQueuedSynchronizer#doSignalAll
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);
}

具体转换同步队列

// AbstractQueuedSynchronizer#transferForSignal
final boolean transferForSignal(Node node) {// CAS将等待状态修改从-2到0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 加入同步队列,enq具体做啥之前说过了(‾◡◝)Node p = enq(node);int ws = p.waitStatus;// 等待状态为cancel 或者 cas修改等待状态Node.SIGNAL失败。就唤醒node的线程。if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}// AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) {if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}

后话

怎么样?有毅力的你,能看到这里,你真的非常的厉害了,给你一个👍

通过对ReentrantLock独占锁的分析,聪明的你一定明白了Node是什么?Node.SIGNAL的意思,AQS的同步队列是什么样子的,怎么加入同步队列的?

通过Semaphore、CutDownLunch共享锁的分析,聪明的你一定明白了Node.PROPAGATE是干嘛的,它们是怎么基于AQS实现共享模式的?nextWaite是用来区分独占和共享模式的字段。

通过CyclicBarrier的分析,我们知道了条件队列,和AQS实现的条件队列转同步队列。

最后的最后

其实还有很多内容还是可以补充的,也欢迎各位大佬指出我的不足🙇‍♂️🙇‍♂️🙇‍♂️

相关文章:

【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码

&#x1f44b;hi&#xff0c;我不是一名外包公司的员工&#xff0c;也不会偷吃茶水间的零食&#xff0c;我的梦想是能写高端CRUD &#x1f525; 2025本人正在沉淀中… 博客更新速度 &#x1f44d; 欢迎点赞、收藏、关注&#xff0c;跟上我的更新节奏 &#x1f4da;欢迎订阅专栏…...

k8s介绍与实践

第一节 理论 基础介绍&#xff0c;部署实践&#xff0c;操作实践&#xff0c;点击这里学习 第二节 dashboard操作 查看安装的dashboard服务信息 kubectl get pod,svc -n kubernetes-dashboard 网页登录地址&#xff1a;https://server_ip:30976/#/login 创建token kube…...

SpringBoot 3 与 SpringDoc 打造完美接口文档

文章目录 1. SpringDoc 简介1.1 SpringDoc 优势2. 环境准备2.1 Maven 依赖2.2 基础配置3. 创建基本文档配置类4. 控制器 API 文档注解4.1 基本控制器示例4.2 模型类示例5. 高级功能5.1 API分组5.2 安全配置5.3 隐藏特定端点6. 参数描述6.1 路径参数6.2 查询参数6.3 请求体7. 响…...

【HFP】蓝牙HFP协议音频连接核心技术深度解析

目录 一、音频连接建立的总体要求 1.1 发起主体与时机 1.2 前提条件 1.3 同步连接的建立 1.4 通知机制 二、不同主体发起的音频连接建立流程 2.1 连接建立触发矩阵 2.2 AG 发起的音频连接建立 2.3 HF 发起的音频连接建立 三、编解码器连接建立流程 3.1 发起条件 3.…...

KRaft面试思路引导

Kafka实在2.8之后就用KRaft进行集群管理了 Conroller负责选举Leader&#xff0c;同时Controller管理集群元数据状态信息&#xff0c;并将元数据信息同步给各个分区的Leader 和Zookeeper管理一样&#xff0c;会选出一个Broker作为Controller去管理整个集群&#xff0c;但是元数…...

FreeRTOS菜鸟入门(六)·移植FreeRTOS到STM32

目录 1. 获取裸机工程模版 2. 下载 FreeRTOS V9.0.0 源码 3. FreeRTOS文件夹内容简介 3.1 FreeRTOS文件夹 3.1.1 Demo文件夹 3.1.2 License 文件夹 3.1.3 Source 文件夹 3.2 FreeRTOS-Plus 文件夹 4. 往裸机工程添加 FreeRTOS 源码 5. 拷贝 FreeRTOSConfig…...

14.第二阶段x64游戏实战-分析人物的名字

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 上一个内容&#xff1a;13.第二阶段x64游戏实战-分析人物等级和升级经验 名字&#xff08;中文英文符号…...

【CS*N是狗】亲测可用!!WIN11上禁用Chrome自动更新IDM插件

现象&#xff1a;每次打开chrome后IDM会弹出提示插件版本不一致。经过排查后发现是chrome把IDM插件给更新了&#xff0c;导致IDM提示版本不匹配。经过摸索后&#xff0c;得到了可行的方案。 第一步&#xff0c;打开Chrome&#xff0c;把IDM插件卸载掉&#xff0c;然后重新安装I…...

漫游git rebase + 浅谈git checkout和git branch -f的分支命令

今天学了两个命令非常有意思&#xff1a;一个是git checkout&#xff0c;一个是git branch -f。我们可以认为在提交树上&#xff0c;任何一个节点代表着一次提交。并且&#xff0c;git commit将会在 H E A D HEAD HEAD指针指向的节点上进行进一步提交。将每一个分支名视为标记当…...

深入理解 React 组件的生命周期:从创建到销毁的全过程

React 作为当今最流行的前端框架之一&#xff0c;其组件生命周期是每个 React 开发者必须掌握的核心概念。本文将全面剖析 React 组件的生命周期&#xff0c;包括类组件的各个生命周期方法和函数组件如何使用 Hooks 模拟生命周期行为&#xff0c;帮助开发者编写更高效、更健壮的…...

基础数学知识-概率论

文章目录 1. 随机事件和概率1. 事件运算规律2. 条件概率3. 事件独立性4. 五大公式5. 古典型概率6. 几何型概率7. n重伯努利试验2. 随机变量与分布1.离散型随机变量2. 连续型随机变量3. 常见分布4. TODO4. 随机变量的数学特征1. 数学期望2. 方差3. 常见分布期望与方差 -- TODO4.…...

OpenCV 图形API(44)颜色空间转换-----将图像从 BGR 色彩空间转换为 RGB 色彩空间函数BGR2RGB()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 将图像从BGR色彩空间转换为RGB色彩空间。 该函数将输入图像从BGR色彩空间转换为RGB。B、G和R通道值的常规范围是0到255。 输出图像是8位无符号3通…...

2025年CMS安全(面试题)

活动发起人小虚竹 想对你说&#xff1a; 这是一个以写作博客为目的的创作活动&#xff0c;旨在鼓励大学生博主们挖掘自己的创作潜能&#xff0c;展现自己的写作才华。如果你是一位热爱写作的、想要展现自己创作才华的小伙伴&#xff0c;那么&#xff0c;快来参加吧&#xff01…...

配置nginx服务,通过多ip区分多网站

首先关闭防火墙,setenforce 0 关过了,不截图了 多IP,首先配置多个IP地址 可以在vm增加虚拟网卡,也可以在同一网卡配置多个IP,我用第一种 记得点确定 查看新的虚拟网卡IP 没有IP,配置一个 安装nginx 写配置 server{listen 192.168.214.130:80;root /www/ip/130; # 资源根目…...

[k8s实战]Containerd 1.7.2 离线安装与配置全指南(生产级优化)

[k8s实战]Containerd 1.7.2 离线安装与配置全指南&#xff08;生产级优化&#xff09; 摘要&#xff1a;本文详细讲解在无外网环境下部署 Containerd 1.7.2 容器运行时的完整流程&#xff0c;涵盖二进制包安装、私有镜像仓库配置、Systemd服务集成等关键步骤&#xff0c;并提供…...

C++中const与constexpr的区别

在C中&#xff0c;const和constexpr都用于定义常量&#xff0c;但它们的用途和行为有显著区别&#xff1a; ### 1. **初始化时机** - **const**&#xff1a;表示变量是只读的&#xff0c;但其值可以在**编译时或运行时**初始化。 cpp const int a 5; // 编译…...

解决Windows安全中心显示空白页面

1、电脑重装系统后&#xff0c;发现原本一些软件打不开了&#xff0c;电脑莫名认为有病毒&#xff0c;自动删除插件。附图。 2、第一反应是电脑防火墙的原因&#xff0c;默认威胁防护识别到了病毒软件&#xff0c;自动删除。在开始屏幕搜Windows安全中心&#xff0c;打开之后发…...

三国战纪119通关笔记

文章目录 诸葛一币通关孙姬马王夏侯渊孟优夏侯惇张辽貂蝉吕蒙沙摩柯破阵及吕布陆逊左慈(跳)许褚黄盖彻里吉(跳)魏延(跳)司马懿曹操道具的安排道具-天师符(废弃)道具-九节杖道具-援军令(废弃)道具-土雷(废弃) 笔者是个菜鸡&#xff0c;什么天王难度无伤&#xff0c;天王难度5禁什…...

Android audio系统五 AudioPolicy 策略配置详解

引用&#xff1a;Android 音频策略配置文件解析流程 audio_policy_configuration.xml 是 Android 音频系统的核心配置文件&#xff0c;它定义了音频硬件接口、设备路由和基本策略。下面我将详细介绍这个文件的结构、关键配置项和实际应用。audio_policy_configuration.xml 是 …...

【MQ篇】初识MQ!

目录 一、什么是MQ&#xff1f;简单来说就是个“快递中转站” &#x1f4e6;二、为什么要用MQ&#xff1f;用了它&#xff0c;好处多多&#xff01;&#x1f929;三、MQ的应用场景&#xff1a;各行各业都能用&#xff01;&#x1f30d;四、MQ的优缺点&#xff1a;硬币的两面&am…...

2、SpringAI接入ChatGPT与微服务整合

2、SpringAI接入ChatGPT与微服务整合 小薛博客AI 大模型资料 1、SpringAI简介 https://spring.io/projects/spring-ai Spring AI是一个人工智能工程的应用框架。其目标是将Spring生态系统的设计原则&#xff08;如可移植性和模块化设计&#xff09;应用于人工智能领域&#…...

【Linux】多进程任务模块

创建多个进程&#xff0c;同时完成任务 task.c #include <sys/types.h> #include <unistd.h> #include<stdio.h> #include <sys/wait.h> int create_process_tasks(Task_fun_t tasks[],int tsak_cnt) {pid_t pid;int i 0;for(i 0;i < 4;i){pid …...

榕壹云预约咨询系统:基于ThinkPHP+MySQL+UniApp打造的灵活预约小程序解决方案

数字化咨询场景的痛点与解决方案 在心理咨询、医疗问诊、法律咨询等需要预约服务的场景中&#xff0c;传统线下预约存在效率低、管理复杂、资源分配不均等问题。榕壹云预约咨询系统基于ThinkPHPMySQLUniApp技术栈开发&#xff0c;为咨询类行业提供了一套高效、安全、可扩展的数…...

鸿蒙NEXT开发LRUCache缓存工具类(单例模式)(ArkTs)

import { util } from kit.ArkTS;/*** LRUCache缓存工具类&#xff08;单例模式&#xff09;* author 鸿蒙布道师* since 2025/04/21*/ export class LRUCacheUtil {private static instance: LRUCacheUtil;private lruCache: util.LRUCache<string, any>;/*** 私有构造函…...

opencv 图像矫正的原理

图像矫正的原理是透视变换&#xff0c;下面来介绍一下透视变换的概念。 听名字有点熟&#xff0c;我们在图像旋转里接触过仿射变换&#xff0c;知道仿射变换是把一个二维坐标系转换到另一个二维坐标系的过程&#xff0c;转换过程坐标点的相对位置和属性不发生变换&#xff0c;…...

计算机前沿技术课程论文 K-means算法在图像处理的应用

K-means算法在图像处理的应用 这是本人在计算机前沿技术课程中的课程论文文章&#xff0c;为了方便大家参考学习&#xff0c;我把完整的论文word文档发到了我的资源里&#xff0c;有需要的可以自取。 点击完整资源链接 目录 K-means算法在图像处理的应用摘要&#xff1a;引言1…...

【股票数据API接口37】如何获取股票指数实时数据之Python、Java等多种主流语言实例代码演示通过股票数据接口获取数据

​ 如今&#xff0c;量化分析在股市领域风靡一时&#xff0c;其核心要素在于数据&#xff0c;获取股票数据&#xff0c;是踏上量化分析之路的第一步。你可以选择亲手编写爬虫来抓取&#xff0c;但更便捷的方式&#xff0c;莫过于利用专业的股票数据API接口。自编爬虫虽零成本&a…...

【仓颉 + 鸿蒙 + AI Agent】CangjieMagic框架(17):PlanReactExecutor

CangjieMagic框架&#xff1a;使用华为仓颉编程语言编写&#xff0c;专门用于开发AI Agent&#xff0c;支持鸿蒙、Windows、macOS、Linux等系统。 这篇文章剖析一下 CangjieMagic 框架中的 PlanReactExecutor。 1 PlanReactExecutor的工作原理 #mermaid-svg-OqJUCSoxZkzylbDY…...

docker harbor私有仓库登录报错

docker harbor私有仓库登录报错如下&#xff1a; [rootsrv-1 ~]# docker login -u user1 -p pwd1 harbor.chinacloudapi.cn WARNING! Using --password via the CLI is insecure. Use --password-stdin. Error response from daemon: Get "https://harbor.chinacloudapi.…...

IQ信号和实信号的关系与转换的matlab实现

IQ信号 IQ信号通常是指两路正交的信号(I路和Q路),在实际信号采样中,通常会进行IQ采样,将实信号转换为复基带信号进行存储。 IQ信号转实信号 IQ信号转为实信号,其实就是将IQ两路正交信号通过上变频合并为一个实数的带通信号,这通常在通信系统中用于将基带信号调制到载…...